summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
committerKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
commita0fccd9dbfabd912a906225f817cd00072d7fc8d (patch)
tree71d457039d047e12ee5bec2f6cc98f457f050bec /java
parent33db3cab8c1ec8d82045e557b190a98a2418c565 (diff)
downloadqpid-python-a0fccd9dbfabd912a906225f817cd00072d7fc8d.tar.gz
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java52
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java38
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java65
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java83
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java143
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java665
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java13
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java15
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java33
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java9
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java10
-rwxr-xr-xjava/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java22
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (renamed from java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java)126
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java190
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java97
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java160
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java59
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java153
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java62
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java323
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java (renamed from java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java)66
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java576
-rw-r--r--java/test-profiles/JavaPre010Excludes1
36 files changed, 2749 insertions, 517 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 4512de6fb4..31c683b548 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -62,7 +62,8 @@ public class QueueConfiguration extends ConfigurationPlugin
"capacity",
"flowResumeCapacity",
"lvq",
- "lvqKey"
+ "lvqKey",
+ "sortKey"
};
}
@@ -167,6 +168,10 @@ public class QueueConfiguration extends ConfigurationPlugin
return getStringValue("lvqKey", null);
}
+ public String getQueueSortKey()
+ {
+ return getStringValue("sortKey", null);
+ }
public static class QueueConfig extends ConfigurationPlugin
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 371ae0de50..2c04a626ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -20,71 +20,25 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
import java.util.Map;
+import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQPriorityQueue extends SimpleAMQQueue
+public class AMQPriorityQueue extends OutOfOrderQueue
{
- protected AMQPriorityQueue(final AMQShortString name,
- final boolean durable,
- final AMQShortString owner,
- final boolean autoDelete,
- boolean exclusive,
- final VirtualHost virtualHost,
- int priorities, Map<String, Object> arguments)
- {
- super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments);
- }
-
- public AMQPriorityQueue(String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
+ protected AMQPriorityQueue(final String name,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ boolean exclusive,
+ final VirtualHost virtualHost,
+ Map<String, Object> arguments,
+ int priorities)
{
- this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),
- autoDelete, exclusive,virtualHost, priorities, arguments);
+ super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
}
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
}
-
- @Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && entry.isAvailable())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
- {
- QueueContext context = (QueueContext) subscription.getQueueContext();
- if(context != null)
- {
- QueueEntry subnode = context._lastSeenEntry;
- QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
- {
- if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
- {
- break;
- }
- else
- {
- subnode = context._lastSeenEntry;
- released = context._releasedEntry;
- }
- }
- }
- }
-
- }
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index bee55118ba..e4a6f01930 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,22 +20,22 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.QueueConfiguration;
-
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
- public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
private abstract static class QueueProperty
{
@@ -157,9 +157,11 @@ public class AMQQueueFactory
String description = "Permission denied: queue-name '" + queueName + "'";
throw new AMQSecurityException(description);
}
-
+
int priorities = 1;
String conflationKey = null;
+ String sortingKey = null;
+
if(arguments != null)
{
if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -170,24 +172,32 @@ public class AMQQueueFactory
conflationKey = QPID_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES.toString()))
+ else if(arguments.containsKey(X_QPID_PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString());
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
}
}
+ else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ {
+ sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ }
}
AMQQueue q;
- if(conflationKey != null)
+ if(sortingKey != null)
+ {
+ q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ }
+ else if(conflationKey != null)
{
q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments);
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
}
else
{
@@ -223,26 +233,22 @@ public class AMQQueueFactory
boolean exclusive = config.getExclusive();
String owner = config.getOwner();
Map<String,Object> arguments = null;
+
if(config.isLVQ() || config.getLVQKey() != null)
{
-
arguments = new HashMap<String,Object>();
arguments.put(QPID_LAST_VALUE_QUEUE, 1);
arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
}
- else
+ else if (config.getPriority() || config.getPriorities() > 0)
{
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if(priority || priorities > 0)
- {
- arguments = new HashMap<String,Object>();
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put("x-qpid-priorities", priorities);
- }
+ arguments = new HashMap<String,Object>();
+ arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ }
+ else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
}
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 2c1883e763..c4762c98c9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -54,7 +54,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
@Override
- public QueueEntry add(final ServerMessage message)
+ public ConflationQueueEntry add(final ServerMessage message)
{
ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
AtomicReference<QueueEntry> latestValueReference = null;
@@ -117,7 +117,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
}
- private final class ConflationQueueEntry extends QueueEntryImpl
+ private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
@@ -158,7 +158,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
_conflationKey = conflationKey;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public ConflationQueueList createQueueEntryList(AMQQueue queue)
{
return new ConflationQueueList(queue, _conflationKey);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
new file mode 100644
index 0000000000..b16d1eb8e3
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -0,0 +1,53 @@
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public abstract class OutOfOrderQueue extends SimpleAMQQueue
+{
+ protected OutOfOrderQueue(String name, boolean durable, String owner,
+ boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
+ QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ {
+ super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+ }
+
+ @Override
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // check that all subscriptions are not in advance of the entry
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ if(!subscription.isClosed())
+ {
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()
+ && (released == null || released.compareTo(entry) > 0))
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+
+ }
+ }
+ }
+
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 0c6b84d2b6..79d3ab5bd0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -20,21 +20,19 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
-public class PriorityQueueList implements QueueEntryList
+public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
private final AMQQueue _queue;
- private final QueueEntryList[] _priorityLists;
+ private final SimpleQueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
public PriorityQueueList(AMQQueue queue, int priorities)
{
_queue = queue;
- _priorityLists = new QueueEntryList[priorities];
+ _priorityLists = new SimpleQueueEntryList[priorities];
_priorities = priorities;
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
@@ -53,7 +51,7 @@ public class PriorityQueueList implements QueueEntryList
return _queue;
}
- public QueueEntry add(ServerMessage message)
+ public SimpleQueueEntryImpl add(ServerMessage message)
{
int index = message.getMessageHeader().getPriority() - _priorityOffset;
if(index >= _priorities)
@@ -68,31 +66,30 @@ public class PriorityQueueList implements QueueEntryList
}
- public QueueEntry next(QueueEntry node)
+ public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
{
- QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
- QueueEntry next = nodeImpl.getNext();
+ SimpleQueueEntryImpl next = node.getNextValidEntry();
if(next == null)
{
- QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
+ final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
int index;
for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
while(next == null && index != 0)
{
index--;
- next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+ next = _priorityLists[index].getHead().getNextValidEntry();
}
}
return next;
}
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl>
{
- private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
- private QueueEntry _lastNode;
+ private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ];
+ private SimpleQueueEntryImpl _lastNode;
PriorityQueueEntryListIterator()
{
@@ -116,7 +113,7 @@ public class PriorityQueueList implements QueueEntryList
return true;
}
- public QueueEntry getNode()
+ public SimpleQueueEntryImpl getNode()
{
return _lastNode;
}
@@ -135,16 +132,21 @@ public class PriorityQueueList implements QueueEntryList
}
}
- public QueueEntryIterator iterator()
+ public PriorityQueueEntryListIterator iterator()
{
return new PriorityQueueEntryListIterator();
}
- public QueueEntry getHead()
+ public SimpleQueueEntryImpl getHead()
{
return _priorityLists[_priorities-1].getHead();
}
+ public void entryDeleted(final SimpleQueueEntryImpl queueEntry)
+ {
+
+ }
+
static class Factory implements QueueEntryListFactory
{
private final int _priorities;
@@ -154,7 +156,7 @@ public class PriorityQueueList implements QueueEntryList
_priorities = priorities;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public PriorityQueueList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueList(queue, _priorities);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index be29245901..c1fb0258fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -214,6 +214,10 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
boolean isQueueDeleted();
+ QueueEntry getNextNode();
+
+ QueueEntry getNextValidEntry();
+
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3d011b99c0..ee1d214c1f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -38,16 +38,11 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-public class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl implements QueueEntry
{
-
- /**
- * Used for debugging purposes.
- */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final SimpleQueueEntryList _queueEntryList;
+ private final QueueEntryList _queueEntryList;
private MessageReference _message;
@@ -80,22 +75,20 @@ public class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
- volatile QueueEntryImpl _next;
-
private static final int DELIVERED_TO_CONSUMER = 1;
private static final int REDELIVERED = 2;
private volatile int _deliveryState;
- QueueEntryImpl(SimpleQueueEntryList queueEntryList)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -104,7 +97,7 @@ public class QueueEntryImpl implements QueueEntry
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -316,16 +309,15 @@ public class QueueEntryImpl implements QueueEntry
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
-
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
public void reject()
@@ -497,33 +489,6 @@ public class QueueEntryImpl implements QueueEntry
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
- public QueueEntryImpl getNext()
- {
-
- QueueEntryImpl next = nextNode();
- while(next != null && next.isDispensed() )
- {
-
- final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
- {
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
- next = nextNode();
- }
- else
- {
- next = null;
- }
-
- }
- return next;
- }
-
- QueueEntryImpl nextNode()
- {
- return _next;
- }
-
public boolean isDeleted()
{
return _state == DELETED_STATE;
@@ -535,7 +500,7 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.entryDeleted(this);
return true;
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
index c5c115a2d1..73ebb0f300 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryIterator
+public interface QueueEntryIterator<QE extends QueueEntry>
{
boolean atTail();
- QueueEntry getNode();
+ QE getNode();
boolean advance();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index b4042ce02c..77c4b912e0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -22,15 +22,17 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList
+public interface QueueEntryList<Q extends QueueEntry>
{
AMQQueue getQueue();
- QueueEntry add(ServerMessage message);
+ Q add(ServerMessage message);
- QueueEntry next(QueueEntry node);
+ Q next(Q node);
- QueueEntryIterator iterator();
+ QueueEntryIterator<Q> iterator();
- QueueEntry getHead();
+ Q getHead();
+
+ void entryDeleted(Q queueEntry);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
new file mode 100644
index 0000000000..0707dc045c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryImpl extends QueueEntryImpl
+{
+ volatile SimpleQueueEntryImpl _next;
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+
+ public SimpleQueueEntryImpl getNextNode()
+ {
+ return _next;
+ }
+
+ public SimpleQueueEntryImpl getNextValidEntry()
+ {
+
+ SimpleQueueEntryImpl next = getNextNode();
+ while(next != null && next.isDispensed())
+ {
+
+ final SimpleQueueEntryImpl newNext = next.getNextNode();
+ if(newNext != null)
+ {
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ next = getNextNode();
+ }
+ else
+ {
+ next = null;
+ }
+
+ }
+ return next;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 46baab8c85..0bb5dcc219 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,10 +1,3 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.message.ServerMessage;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -25,25 +18,31 @@ import java.util.concurrent.atomic.AtomicLong;
* under the License.
*
*/
-public class SimpleQueueEntryList implements QueueEntryList
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
{
- private final QueueEntryImpl _head;
+ private final SimpleQueueEntryImpl _head;
- private volatile QueueEntryImpl _tail;
+ private volatile SimpleQueueEntryImpl _tail;
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl>
_tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
+ (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
private final AMQQueue _queue;
- static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
+ (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
@@ -52,14 +51,14 @@ public class SimpleQueueEntryList implements QueueEntryList
public SimpleQueueEntryList(AMQQueue queue)
{
_queue = queue;
- _head = new QueueEntryImpl(this);
+ _head = new SimpleQueueEntryImpl(this);
_tail = _head;
}
void advanceHead()
{
- QueueEntryImpl next = _head.nextNode();
- QueueEntryImpl newNext = _head.getNext();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
if (next == newNext)
{
@@ -73,11 +72,11 @@ public class SimpleQueueEntryList implements QueueEntryList
void scavenge()
{
- QueueEntryImpl next = _head.getNext();
+ SimpleQueueEntryImpl next = _head.getNextValidEntry();
while (next != null)
{
- next = next.getNext();
+ next = next.getNextValidEntry();
}
}
@@ -88,13 +87,13 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntry add(ServerMessage message)
+ public SimpleQueueEntryImpl add(ServerMessage message)
{
- QueueEntryImpl node = createQueueEntry(message);
+ SimpleQueueEntryImpl node = createQueueEntry(message);
for (;;)
{
- QueueEntryImpl tail = _tail;
- QueueEntryImpl next = tail.nextNode();
+ SimpleQueueEntryImpl tail = _tail;
+ SimpleQueueEntryImpl next = tail.getNextNode();
if (tail == _tail)
{
if (next == null)
@@ -115,23 +114,22 @@ public class SimpleQueueEntryList implements QueueEntryList
}
}
- protected QueueEntryImpl createQueueEntry(ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message)
{
- return new QueueEntryImpl(this, message);
+ return new SimpleQueueEntryImpl(this, message);
}
- public QueueEntry next(QueueEntry node)
+ public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
{
- return ((QueueEntryImpl)node).getNext();
+ return node.getNextValidEntry();
}
-
- public static class QueueEntryIteratorImpl implements QueueEntryIterator
+ public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
{
- private QueueEntryImpl _lastNode;
+ private SimpleQueueEntryImpl _lastNode;
- QueueEntryIteratorImpl(QueueEntryImpl startNode)
+ QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
{
_lastNode = startNode;
}
@@ -139,14 +137,12 @@ public class SimpleQueueEntryList implements QueueEntryList
public boolean atTail()
{
- return _lastNode.nextNode() == null;
+ return _lastNode.getNextNode() == null;
}
- public QueueEntry getNode()
+ public SimpleQueueEntryImpl getNode()
{
-
return _lastNode;
-
}
public boolean advance()
@@ -154,10 +150,10 @@ public class SimpleQueueEntryList implements QueueEntryList
if(!atTail())
{
- QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDispensed() && nextNode.nextNode() != null)
+ SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
+ while(nextNode.isDispensed() && nextNode.getNextNode() != null)
{
- nextNode = nextNode.nextNode();
+ nextNode = nextNode.getNextNode();
}
_lastNode = nextNode;
return true;
@@ -173,21 +169,26 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntryIterator iterator()
+ public QueueEntryIteratorImpl iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- public QueueEntry getHead()
+ public SimpleQueueEntryImpl getHead()
{
return _head;
}
+ public void entryDeleted(SimpleQueueEntryImpl queueEntry)
+ {
+ advanceHead();
+ }
+
static class Factory implements QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public SimpleQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
new file mode 100644
index 0000000000..3f02442704
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SortedQueue extends OutOfOrderQueue
+{
+ private String _sortedPropertyName;
+
+ protected SortedQueue(final String name, final boolean durable,
+ final String owner, final boolean autoDelete, final boolean exclusive,
+ final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+ {
+ super(name, durable, owner, autoDelete, exclusive, virtualHost,
+ new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+ this._sortedPropertyName = sortedPropertyName;
+ }
+
+ public String getSortedPropertyName()
+ {
+ return _sortedPropertyName;
+ }
+
+ public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ {
+ super.enqueue(message, action);
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java
new file mode 100644
index 0000000000..1052adbe67
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+/**
+ * An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
+ */
+public class SortedQueueEntryImpl extends QueueEntryImpl
+{
+ public static enum Colour
+ {
+ RED, BLACK
+ };
+
+ private volatile SortedQueueEntryImpl _next;
+ private SortedQueueEntryImpl _prev;
+ private String _key;
+
+ private Colour _colour = Colour.BLACK;
+ private SortedQueueEntryImpl _parent;
+ private SortedQueueEntryImpl _left;
+ private SortedQueueEntryImpl _right;
+
+ public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList,
+ final ServerMessage message, final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ @Override
+ public int compareTo(final QueueEntry o)
+ {
+ final String otherKey = ((SortedQueueEntryImpl) o)._key;
+ final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
+ return compare == 0 ? super.compareTo(o) : compare;
+ }
+
+ public Colour getColour()
+ {
+ return _colour;
+ }
+
+ public String getKey()
+ {
+ return _key;
+ }
+
+ public SortedQueueEntryImpl getLeft()
+ {
+ return _left;
+ }
+
+ public SortedQueueEntryImpl getNextNode()
+ {
+ return _next;
+ }
+
+ @Override
+ public SortedQueueEntryImpl getNextValidEntry()
+ {
+ return getNextNode();
+ }
+
+ public SortedQueueEntryImpl getParent()
+ {
+ return _parent;
+ }
+
+ public SortedQueueEntryImpl getPrev()
+ {
+ return _prev;
+ }
+
+ public SortedQueueEntryImpl getRight()
+ {
+ return _right;
+ }
+
+ public void setColour(final Colour colour)
+ {
+ _colour = colour;
+ }
+
+ public void setKey(final String key)
+ {
+ _key = key;
+ }
+
+ public void setLeft(final SortedQueueEntryImpl left)
+ {
+ _left = left;
+ }
+
+ public void setNext(final SortedQueueEntryImpl next)
+ {
+ _next = next;
+ }
+
+ public void setParent(final SortedQueueEntryImpl parent)
+ {
+ _parent = parent;
+ }
+
+ public void setPrev(final SortedQueueEntryImpl prev)
+ {
+ _prev = prev;
+ }
+
+ public void setRight(final SortedQueueEntryImpl right)
+ {
+ _right = right;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + (_colour == Colour.RED ? "Red," : "Black,") + _key + ")";
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
new file mode 100644
index 0000000000..5f8ab16c06
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -0,0 +1,665 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * A sorted implementation of QueueEntryList.
+ * Uses the red/black tree algorithm specified in "Introduction to Algorithms".
+ * ISBN-10: 0262033844
+ * ISBN-13: 978-0262033848
+ * @see http://en.wikipedia.org/wiki/Red-black_tree
+ */
+public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
+{
+ private final SortedQueueEntryImpl _head;
+ private SortedQueueEntryImpl _root;
+ private long _entryId = Long.MIN_VALUE;
+ private final Object _lock = new Object();
+ private final AMQQueue _queue;
+ private final String _propertyName;
+
+ public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+ {
+ _queue = queue;
+ _head = new SortedQueueEntryImpl(this);
+ _propertyName = propertyName;
+ }
+
+ @Override
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ @Override
+ public SortedQueueEntryImpl add(final ServerMessage message)
+ {
+ synchronized(_lock)
+ {
+ String key = null;
+ final Object val = message.getMessageHeader().getHeader(_propertyName);
+ if(val != null)
+ {
+ key = val.toString();
+ }
+
+ final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId);
+ entry.setKey(key);
+
+ insert(entry);
+
+ return entry;
+ }
+ }
+
+ /**
+ * Red Black Tree insert implementation.
+ * @param entry the entry to insert.
+ */
+ private void insert(final SortedQueueEntryImpl entry)
+ {
+ SortedQueueEntryImpl node = _root;
+ if((node = _root) == null)
+ {
+ _root = entry;
+ _head.setNext(entry);
+ entry.setPrev(_head);
+ return;
+ }
+ else
+ {
+ SortedQueueEntryImpl parent = null;
+ while(node != null)
+ {
+ parent = node;
+ if(entry.compareTo(node) < 0)
+ {
+ node = node.getLeft();
+ }
+ else
+ {
+ node = node.getRight();
+ }
+ }
+ entry.setParent(parent);
+
+ if(entry.compareTo(parent) < 0)
+ {
+ parent.setLeft(entry);
+ final SortedQueueEntryImpl prev = parent.getPrev();
+ entry.setNext(parent);
+ prev.setNext(entry);
+ entry.setPrev(prev);
+ parent.setPrev(entry);
+ }
+ else
+ {
+ parent.setRight(entry);
+
+ final SortedQueueEntryImpl next = parent.getNextValidEntry();
+ entry.setNext(next);
+ parent.setNext(entry);
+
+ if(next != null)
+ {
+ next.setPrev(entry);
+ }
+ entry.setPrev(parent);
+ }
+ }
+ entry.setColour(Colour.RED);
+ insertFixup(entry);
+ }
+
+ private void insertFixup(SortedQueueEntryImpl entry)
+ {
+ while(isParentColour(entry, Colour.RED))
+ {
+ final SortedQueueEntryImpl grandparent = nodeGrandparent(entry);
+
+ if(nodeParent(entry) == leftChild(grandparent))
+ {
+ final SortedQueueEntryImpl y = rightChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == rightChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ leftRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ rightRotate(nodeGrandparent(entry));
+ }
+ }
+ else
+ {
+ final SortedQueueEntryImpl y = leftChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ rightRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ leftRotate(nodeGrandparent(entry));
+ }
+ }
+ }
+ _root.setColour(Colour.BLACK);
+ }
+
+ private void leftRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl rightChild = rightChild(entry);
+ entry.setRight(rightChild.getLeft());
+ if(entry.getRight() != null)
+ {
+ entry.getRight().setParent(entry);
+ }
+ rightChild.setParent(entry.getParent());
+ if(entry.getParent() == null)
+ {
+ _root = rightChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(rightChild);
+ }
+ else
+ {
+ entry.getParent().setRight(rightChild);
+ }
+ rightChild.setLeft(entry);
+ entry.setParent(rightChild);
+ }
+ }
+
+ private void rightRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl leftChild = leftChild(entry);
+ entry.setLeft(leftChild.getRight());
+ if(entry.getLeft() != null)
+ {
+ leftChild.getRight().setParent(entry);
+ }
+ leftChild.setParent(entry.getParent());
+ if(leftChild.getParent() == null)
+ {
+ _root = leftChild;
+ }
+ else if(entry == entry.getParent().getRight())
+ {
+ entry.getParent().setRight(leftChild);
+ }
+ else
+ {
+ entry.getParent().setLeft(leftChild);
+ }
+ leftChild.setRight(entry);
+ entry.setParent(leftChild);
+ }
+ }
+
+ private void setColour(final SortedQueueEntryImpl node, final Colour colour)
+ {
+ if(node != null)
+ {
+ node.setColour(colour);
+ }
+ }
+
+ private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getLeft();
+ }
+
+ private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getRight();
+ }
+
+ private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getParent();
+ }
+
+ private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node)
+ {
+ return nodeParent(nodeParent(node));
+ }
+
+ private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+
+ return node != null && isNodeColour(node.getParent(), colour);
+ }
+
+ protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+ return (node == null ? Colour.BLACK : node.getColour()) == colour;
+ }
+
+ @Override
+ public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
+ {
+ synchronized(_lock)
+ {
+ if(node.isDispensed() && _head != node)
+ {
+ SortedQueueEntryImpl current = _head;
+ SortedQueueEntryImpl next;
+ while(current != null)
+ {
+ next = current.getNextValidEntry();
+ if(current.compareTo(node)>0 && !current.isDispensed())
+ {
+ break;
+ }
+ else
+ {
+ current = next;
+ }
+ }
+ return current;
+ }
+ else
+ {
+ return node.getNextValidEntry();
+ }
+ }
+ }
+
+ @Override
+ public QueueEntryIterator<SortedQueueEntryImpl> iterator()
+ {
+ return new QueueEntryIteratorImpl(_head);
+ }
+
+ @Override
+ public SortedQueueEntryImpl getHead()
+ {
+ return _head;
+ }
+
+ protected SortedQueueEntryImpl getRoot()
+ {
+ return _root;
+ }
+
+ @Override
+ public void entryDeleted(final SortedQueueEntryImpl entry)
+ {
+ synchronized(_lock)
+ {
+ // If the node to be removed has two children, we swap the position
+ // of the node and its successor in the tree
+ if(leftChild(entry) != null && rightChild(entry) != null)
+ {
+ swapWithSuccessor(entry);
+ }
+
+ // Then deal with the easy doubly linked list deletion (need to do
+ // this after the swap as the swap uses next
+ final SortedQueueEntryImpl prev = entry.getPrev();
+ if(prev != null)
+ {
+ prev.setNext(entry.getNextValidEntry());
+ }
+
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ if(next != null)
+ {
+ next.setPrev(prev);
+ }
+
+ // now deal with splicing
+ final SortedQueueEntryImpl chosenChild;
+
+ if(leftChild(entry) != null)
+ {
+ chosenChild = leftChild(entry);
+ }
+ else
+ {
+ chosenChild = rightChild(entry);
+ }
+
+ if(chosenChild != null)
+ {
+ // we have one child (x), we can move it up to replace x;
+ chosenChild.setParent(entry.getParent());
+ if(chosenChild.getParent() == null)
+ {
+ _root = chosenChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(chosenChild);
+ }
+ else
+ {
+ entry.getParent().setRight(chosenChild);
+ }
+
+ entry.setLeft(null);
+ entry.setRight(null);
+ entry.setParent(null);
+
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(chosenChild);
+ }
+
+ }
+ else
+ {
+ // no children
+ if(entry.getParent() == null)
+ {
+ // no parent either - the tree is empty
+ _root = null;
+ }
+ else
+ {
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(entry);
+ }
+
+ if(entry.getParent() != null)
+ {
+ if(entry.getParent().getLeft() == entry)
+ {
+ entry.getParent().setLeft(null);
+ }
+ else if(entry.getParent().getRight() == entry)
+ {
+ entry.getParent().setRight(null);
+ }
+ entry.setParent(null);
+ }
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Swaps the position of the node in the tree with it's successor
+ * (that is the node with the next highest key)
+ * @param entry
+ */
+ private void swapWithSuccessor(final SortedQueueEntryImpl entry)
+ {
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ final SortedQueueEntryImpl nextParent = next.getParent();
+ final SortedQueueEntryImpl nextLeft = next.getLeft();
+ final SortedQueueEntryImpl nextRight = next.getRight();
+ final Colour nextColour = next.getColour();
+
+ // Special case - the successor is the right child of the node
+ if(next == entry.getRight())
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setRight(entry);
+ entry.setParent(next);
+ next.setLeft(entry.getLeft());
+
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+
+ next.setColour(entry.getColour());
+ entry.setColour(nextColour);
+ entry.setLeft(nextLeft);
+
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ }
+ else
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setLeft(entry.getLeft());
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+ next.setRight(entry.getRight());
+ if(next.getRight() != null)
+ {
+ next.getRight().setParent(next);
+ }
+ next.setColour(entry.getColour());
+
+ entry.setParent(nextParent);
+ if(nextParent.getLeft() == next)
+ {
+ nextParent.setLeft(entry);
+ }
+ else
+ {
+ nextParent.setRight(entry);
+ }
+
+ entry.setLeft(nextLeft);
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ entry.setColour(nextColour);
+ }
+ }
+
+ private void deleteFixup(SortedQueueEntryImpl entry)
+ {
+ int i = 0;
+ while(entry != null && entry != _root
+ && isNodeColour(entry, Colour.BLACK))
+ {
+ i++;
+
+ if(i > 1000)
+ {
+ return;
+ }
+
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry));
+ if(isNodeColour(rightSibling, Colour.RED))
+ {
+ setColour(rightSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ leftRotate(nodeParent(entry));
+ rightSibling = rightChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(rightSibling), Colour.BLACK)
+ && isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(rightSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(leftChild(rightSibling), Colour.BLACK);
+ rightSibling.setColour(Colour.RED);
+ rightRotate(rightSibling);
+ rightSibling = rightChild(nodeParent(entry));
+ }
+ setColour(rightSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(rightChild(rightSibling), Colour.BLACK);
+ leftRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ else
+ {
+ SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry));
+ if(isNodeColour(leftSibling, Colour.RED))
+ {
+ setColour(leftSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ rightRotate(nodeParent(entry));
+ leftSibling = leftChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK)
+ && isNodeColour(rightChild(leftSibling), Colour.BLACK))
+ {
+ setColour(leftSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK))
+ {
+ setColour(rightChild(leftSibling), Colour.BLACK);
+ leftSibling.setColour(Colour.RED);
+ leftRotate(leftSibling);
+ leftSibling = leftChild(nodeParent(entry));
+ }
+ setColour(leftSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(leftChild(leftSibling), Colour.BLACK);
+ rightRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ }
+ setColour(entry, Colour.BLACK);
+ }
+
+ private Colour getColour(final SortedQueueEntryImpl x)
+ {
+ return x == null ? null : x.getColour();
+ }
+
+ public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl>
+ {
+ private SortedQueueEntryImpl _lastNode;
+
+ public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode)
+ {
+ _lastNode = startNode;
+ }
+
+ public boolean atTail()
+ {
+ return next(_lastNode) == null;
+ }
+
+ public SortedQueueEntryImpl getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ if(!atTail())
+ {
+ SortedQueueEntryImpl nextNode = next(_lastNode);
+ while(nextNode.isDispensed() && next(nextNode) != null)
+ {
+ nextNode = next(nextNode);
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
new file mode 100644
index 0000000000..7a70795e77
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
@@ -0,0 +1,19 @@
+package org.apache.qpid.server.queue;
+
+public class SortedQueueEntryListFactory implements QueueEntryListFactory
+{
+
+ private final String _propertyName;
+
+ public SortedQueueEntryListFactory(final String propertyName)
+ {
+ _propertyName = propertyName;
+ }
+
+ @Override
+ public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue)
+ {
+ return new SortedQueueEntryList(queue, _propertyName);
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
index 8bb5d02b01..a43be30b85 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.tools.messagestore.commands;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.commons.codec.binary.Hex;
-import org.apache.qpid.server.queue.QueueEntryImpl;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleQueueEntryImpl;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.List;
-
public class Dump extends Show
{
private static final int LINE_SIZE = 8;
@@ -259,7 +258,7 @@ public class Dump extends Show
String title, boolean routing, boolean headers, boolean messageHeaders)
{
List<QueueEntry> single = new LinkedList<QueueEntry>();
- single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE));
+ single.add(new SimpleQueueEntryImpl(null,msg, Long.MIN_VALUE));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index d2f2ae5eea..9941c00499 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -129,7 +129,19 @@ public class QueueConfigurationTest extends TestCase
assertEquals(1, qConf.getMinimumAlertRepeatGap());
}
- private VirtualHostConfiguration overrideConfiguration(String property, int value)
+ public void testSortQueueConfiguration() throws ConfigurationException
+ {
+ //Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertNull(qConf.getQueueSortKey());
+
+ // Check explicit value
+ final VirtualHostConfiguration vhostConfig = overrideConfiguration("sortKey", "test-sort-key");
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("test-sort-key", qConf.getQueueSortKey());
+ }
+
+ private VirtualHostConfiguration overrideConfiguration(String property, Object value)
throws ConfigurationException
{
PropertiesConfiguration queueConfig = new PropertiesConfiguration();
@@ -141,5 +153,4 @@ public class QueueConfigurationTest extends TestCase
return new VirtualHostConfiguration("test", config);
}
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 3b7f5f3a51..7b73987abf 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -20,9 +20,17 @@
*/
package org.apache.qpid.server.exchange;
-import junit.framework.TestCase;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -52,17 +60,6 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class);
@@ -483,6 +480,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
return false;
}
+
+ public QueueEntry getNextNode()
+ {
+ return null;
+ }
+
+ public QueueEntry getNextValidEntry()
+ {
+ return null;
+ }
};
if(action != null)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 3961b3b355..2ce43052d9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -21,12 +21,12 @@ package org.apache.qpid.server.queue;
*/
import java.util.ArrayList;
-
+import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import junit.framework.AssertionFailedError;
+import org.apache.qpid.server.message.AMQMessage;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
@@ -35,7 +35,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
public void setUp() throws Exception
{
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3);
super.setUp();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index 7000df157e..12369bd7d4 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -20,24 +20,23 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
public class MockAMQMessage extends AMQMessage
{
public MockAMQMessage(long messageId)
- throws AMQException
{
super(new MockStoredMessage(messageId));
}
-
-
+ public MockAMQMessage(long messageId, String headerName, Object headerValue)
+ {
+ super(new MockStoredMessage(messageId, headerName, headerValue));
+ }
@Override
public long getSize()
{
return 0l;
}
-
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index ab8850c18c..864b9ad368 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -234,4 +234,14 @@ public class MockQueueEntry implements QueueEntry
return false;
}
+ public QueueEntry getNextNode()
+ {
+ return null;
+ }
+
+ public QueueEntry getNextValidEntry()
+ {
+ return null;
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index 7dc491de4d..78ed3e9f34 100755
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.framing.FieldTable;
+
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.StoredMessage;
@@ -36,18 +38,32 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
private MessageMetaData _metaData;
private final ByteBuffer _content;
-
public MockStoredMessage(long messageId)
{
- this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60));
+ this(messageId, (String)null, null);
+ }
+
+ public MockStoredMessage(long messageId, String headerName, Object headerValue)
+ {
+ this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue);
}
public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
{
+ this(messageId, info, chb, null, null);
+ }
+
+ public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue)
+ {
_messageId = messageId;
+ if (headerName != null)
+ {
+ FieldTable headers = new FieldTable();
+ headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
+ ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+ }
_metaData = new MessageMetaData(info, chb, 0);
_content = ByteBuffer.allocate(_metaData.getContentSize());
-
}
public MessageMetaData getMetaData()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index d8afd8d829..d336132316 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -19,9 +19,7 @@
package org.apache.qpid.server.queue;
import java.lang.reflect.Field;
-
import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
@@ -30,18 +28,27 @@ import org.apache.qpid.server.subscription.Subscription;
/**
* Tests for {@link QueueEntryImpl}
- *
*/
-public class QueueEntryImplTest extends TestCase
+public abstract class QueueEntryImplTestBase extends TestCase
{
// tested entry
- private QueueEntryImpl _queueEntry;
+ protected QueueEntryImpl _queueEntry;
+ protected QueueEntryImpl _queueEntry2;
+ protected QueueEntryImpl _queueEntry3;
+
+ public abstract QueueEntryImpl getQueueEntryImpl(int msgid) throws AMQException;
+
+ public abstract void testCompareTo();
+
+ public abstract void testTraverseWithNoDeletedEntries();
+
+ public abstract void testTraverseWithDeletedEntries();
public void setUp() throws Exception
{
- AMQMessage message = new MockAMQMessage(1);
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- _queueEntry = new QueueEntryImpl(queueEntryList, message, 1);
+ _queueEntry = getQueueEntryImpl(1);
+ _queueEntry2 = getQueueEntryImpl(2);
+ _queueEntry3 = getQueueEntryImpl(3);
}
public void testAquire()
@@ -105,61 +112,6 @@ public class QueueEntryImplTest extends TestCase
}
/**
- * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
- */
- public void testGetNext()
- {
- int numberOfEntries = 5;
- QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
-
- // create test entries
- for(int i = 0; i < numberOfEntries ; i++)
- {
- AMQMessage message = null;;
- try
- {
- message = new MockAMQMessage(i);
- }
- catch (AMQException e)
- {
- fail("Failure to create a mock message:" + e.getMessage());
- }
- QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
- entries[i] = entry;
- }
-
- // test getNext for not acquired entries
- for(int i = 0; i < numberOfEntries ; i++)
- {
- QueueEntryImpl queueEntry = entries[i];
- QueueEntryImpl next = queueEntry.getNext();
- if (i < numberOfEntries - 1)
- {
- assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
- }
- else
- {
- assertNull("The next entry after the last should be null", next);
- }
- }
-
- // delete second
- entries[1].acquire();
- entries[1].delete();
-
- // dequeue third
- entries[2].acquire();
- entries[2].dequeue();
-
- QueueEntryImpl next = entries[0].getNext();
- assertEquals("expected forth entry",entries[3], next);
- next = next.getNext();
- assertEquals("expected fifth entry", entries[4], next);
- next = next.getNext();
- assertNull("The next entry after the last should be null", next);
- }
- /**
* A helper method to put tested object into deleted state and assert the state
*/
private void delete()
@@ -244,4 +196,52 @@ public class QueueEntryImplTest extends TestCase
assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
}
+
+ /**
+ * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method.
+ */
+ public void testGetNext()
+ {
+ int numberOfEntries = 5;
+ QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
+ SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+ // create test entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ AMQMessage message = new MockAMQMessage(i);
+ QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message);
+ entries[i] = entry;
+ }
+
+ // test getNext for not acquired entries
+ for(int i = 0; i < numberOfEntries ; i++)
+ {
+ QueueEntryImpl queueEntry = entries[i];
+ QueueEntry next = queueEntry.getNextValidEntry();
+ if (i < numberOfEntries - 1)
+ {
+ assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+ }
+ else
+ {
+ assertNull("The next entry after the last should be null", next);
+ }
+ }
+
+ // delete second
+ entries[1].acquire();
+ entries[1].delete();
+
+ // dequeue third
+ entries[2].acquire();
+ entries[2].dequeue();
+
+ QueueEntry next = entries[0].getNextValidEntry();
+ assertEquals("expected forth entry",entries[3], next);
+ next = next.getNextValidEntry();
+ assertEquals("expected fifth entry", entries[4], next);
+ next = next.getNextValidEntry();
+ assertNull("The next entry after the last should be null", next);
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
new file mode 100644
index 0000000000..7a3f6f701c
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+/**
+ * Abstract test class for QueueEntryList implementations.
+ */
+public abstract class QueueEntryListTestBase extends TestCase
+{
+ protected static final AMQQueue _testQueue = new MockAMQQueue("test");
+ public abstract QueueEntryList<QueueEntry> getTestList();
+ public abstract long getExpectedFirstMsgId();
+ public abstract int getExpectedListLength();
+ public abstract ServerMessage getTestMessageToAdd() throws AMQException;
+
+ public void testGetQueue()
+ {
+ assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue);
+ }
+
+ /**
+ * Test to add a message with properties specific to the queue type.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getTestMessageToAdd()
+ * @throws AMQException
+ */
+ public void testAddSpecificMessage() throws AMQException
+ {
+ final QueueEntryList<QueueEntry> list = getTestList();
+ list.add(getTestMessageToAdd());
+
+ final QueueEntryIterator<?> iter = list.iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("List did not grow by one entry after an add", getExpectedListLength() + 1, count);
+ }
+
+ /**
+ * Test to add a generic mock message.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ * @see MockAMQMessage
+ * @throws AMQException
+ */
+ public void testAddGenericMessage() throws AMQException
+ {
+ final QueueEntryList<QueueEntry> list = getTestList();
+ list.add(new MockAMQMessage(666));
+
+ final QueueEntryIterator<?> iter = list.iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("List did not grow by one entry after a generic message added", getExpectedListLength() + 1, count);
+
+ }
+
+ /**
+ * Test for getting the next element in a queue list.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testListNext()
+ {
+ final QueueEntryList<QueueEntry> entryList = getTestList();
+ QueueEntry entry = entryList.getHead();
+ int count = 0;
+ while(entryList.next(entry) != null)
+ {
+ entry = entryList.next(entry);
+ count++;
+ }
+ assertEquals("Get next didnt get all the list entries", getExpectedListLength(), count);
+ }
+
+ /**
+ * Basic test for the associated QueueEntryIterator implementation.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testIterator()
+ {
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ iter.getNode();
+ count++;
+ }
+ assertEquals("Iterator invalid", getExpectedListLength(), count);
+ }
+
+ /**
+ * Test for associated QueueEntryIterator implementation that checks it handles "removed" messages.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testDequedMessagedNotPresentInIterator() throws Exception
+ {
+ final int numberOfMessages = getExpectedListLength();
+ final QueueEntryList<QueueEntry> entryList = getTestList();
+
+ // dequeue all even messages
+ final QueueEntryIterator<?> it1 = entryList.iterator();
+ int counter = 0;
+ while (it1.advance())
+ {
+ final QueueEntry queueEntry = it1.getNode();
+ if(counter++ % 2 == 0)
+ {
+ queueEntry.acquire();
+ queueEntry.dequeue();
+ }
+ }
+
+ // iterate and check that dequeued messages are not returned by iterator
+ final QueueEntryIterator<?> it2 = entryList.iterator();
+ int counter2 = 0;
+ while(it2.advance())
+ {
+ it2.getNode();
+ counter2++;
+ }
+ final int expectedNumber = numberOfMessages / 2;
+ assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter2,
+ expectedNumber, counter2);
+ }
+
+ /**
+ * Test to verify the head of the queue list is returned as expected.
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedFirstMsgId()
+ */
+ public void testGetHead()
+ {
+ final QueueEntry head = getTestList().getHead();
+ assertNull("Head entry should not contain an actual message", head.getMessage());
+ assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
+ .getMessage().getMessageNumber().longValue());
+ }
+
+ /**
+ * Test to verify the entry deletion handled correctly.
+ * @see QueueEntryListTestBase#getTestList()
+ */
+ public void testEntryDeleted()
+ {
+ final QueueEntry head = getTestList().getHead();
+
+ final QueueEntry first = getTestList().next(head);
+ first.delete();
+
+ final QueueEntry second = getTestList().next(head);
+ assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second
+ .getMessage().getMessageNumber());
+
+ final QueueEntry third = getTestList().next(first);
+ assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second
+ .getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
deleted file mode 100644
index b67723dd25..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-
-/**
- *
- * Tests QueueEntry
- *
- */
-public class QueueEntryTest extends QpidTestCase
-{
- private QueueEntryImpl _queueEntry1 = null;
- private QueueEntryImpl _queueEntry2 = null;
- private QueueEntryImpl _queueEntry3 = null;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- int i = 0;
-
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null);
- _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++));
- }
-
- public void testCompareTo()
- {
- assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0);
- assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0);
- assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0);
- }
-
- /**
- * Tests that the getNext() can be used to traverse the list.
- */
- public void testTraverseWithNoDeletedEntries()
- {
- QueueEntryImpl current = _queueEntry1;
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry2, current);
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry3, current);
-
- current = current.getNext();
- assertNull(current);
-
- }
-
- /**
- * Tests that the getNext() can be used to traverse the list but deleted
- * entries are skipped and de-linked from the chain of entries.
- */
- public void testTraverseWithDeletedEntries()
- {
- // Delete 2nd queue entry
- _queueEntry2.delete();
- assertTrue(_queueEntry2.isDeleted());
-
-
- QueueEntryImpl current = _queueEntry1;
-
- current = current.getNext();
- assertSame("Unexpected current entry",_queueEntry3, current);
-
- current = current.getNext();
- assertNull(current);
-
- // Assert the side effects of getNext()
- assertSame("Next node of entry 1 should now be entry 3",
- _queueEntry3, _queueEntry1.nextNode());
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
new file mode 100644
index 0000000000..7ff693e4c4
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.Assert;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+
+/**
+ * Test extension of SortedQueueEntryList that provides data structure validation tests.
+ * @see SortedQueueEntryList
+ */
+public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
+{
+ public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName)
+ {
+ super(queue, propertyName);
+ }
+
+ @Override /** Overridden to automatically check queue properties before and after. */
+ public SortedQueueEntryImpl add(final ServerMessage message)
+ {
+ assertQueueProperties(); //before add
+ final SortedQueueEntryImpl result = super.add(message);
+ assertQueueProperties(); //after add
+ return result;
+ }
+
+ @Override /** Overridden to automatically check queue properties before and after. */
+ public void entryDeleted(SortedQueueEntryImpl entry)
+ {
+ assertQueueProperties(); //before delete
+ super.entryDeleted(entry);
+ assertQueueProperties(); //after delete
+ }
+
+ public void assertQueueProperties()
+ {
+ assertRootIsBlack();
+ assertTreeIntegrity();
+ assertChildrenOfRedAreBlack();
+ assertLeavesSameBlackPath();
+ }
+
+ public void assertRootIsBlack()
+ {
+ if(!isNodeColour(getRoot(), Colour.BLACK))
+ {
+ Assert.fail("Root Not Black");
+ }
+ }
+
+ public void assertTreeIntegrity()
+ {
+ assertTreeIntegrity(getRoot());
+ }
+
+ public void assertTreeIntegrity(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return;
+ }
+ if(node.getLeft() != null)
+ {
+ if(node.getLeft().getParent() == node)
+ {
+ assertTreeIntegrity(node.getLeft());
+ }
+ else
+ {
+ Assert.fail("Tree integrity compromised");
+ }
+ }
+ if(node.getRight() != null)
+ {
+ if(node.getRight().getParent() == node)
+ {
+ assertTreeIntegrity(node.getRight());
+ }
+ else
+ {
+ Assert.fail("Tree integrity compromised");
+ }
+
+ }
+ }
+
+ public void assertLeavesSameBlackPath()
+ {
+ assertLeavesSameBlackPath(getRoot());
+ }
+
+ public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return 1;
+ }
+ final int left = assertLeavesSameBlackPath(node.getLeft());
+ final int right = assertLeavesSameBlackPath(node.getLeft());
+ if(left == right)
+ {
+ return isNodeColour(node, Colour.BLACK) ? 1 + left : left;
+ }
+ else
+ {
+ Assert.fail("Unequal paths to leaves");
+ return 1; //compiler
+ }
+ }
+
+ public void assertChildrenOfRedAreBlack()
+ {
+ assertChildrenOfRedAreBlack(getRoot());
+ }
+
+ public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node)
+ {
+ if(node == null)
+ {
+ return;
+ }
+ else if(node.getColour() == Colour.BLACK)
+ {
+ assertChildrenOfRedAreBlack(node.getLeft());
+ assertChildrenOfRedAreBlack(node.getRight());
+ }
+ else
+ {
+ if(isNodeColour(node.getLeft(), Colour.BLACK)
+ && isNodeColour(node.getRight(), Colour.BLACK))
+ {
+ assertChildrenOfRedAreBlack(node.getLeft());
+ assertChildrenOfRedAreBlack(node.getRight());
+ }
+ else
+ {
+ Assert.fail("Children of Red are not both black");
+ }
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index f4cdbbe02c..e4ed232f13 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,12 @@
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -40,6 +44,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
@@ -51,12 +56,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -1110,9 +1109,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
* Entries with even message id are considered
* dequeued!
*/
- protected QueueEntryImpl createQueueEntry(final ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message)
{
- return new QueueEntryImpl(this, message)
+ return new SimpleQueueEntryImpl(this, message)
{
public boolean isDequeued()
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
new file mode 100644
index 0000000000..d8d78bbb84
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -0,0 +1,59 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase {
+
+ private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ ServerMessage message = new MockAMQMessage(msgId);
+ return queueEntryList.add(message);
+ }
+
+ public void testCompareTo()
+ {
+ assertTrue(_queueEntry.compareTo(_queueEntry2) < 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0);
+ assertTrue(_queueEntry.compareTo(_queueEntry3) < 0);
+
+ assertTrue(_queueEntry2.compareTo(_queueEntry) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry) > 0);
+
+ assertTrue(_queueEntry.compareTo(_queueEntry) == 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0);
+ }
+
+ public void testTraverseWithNoDeletedEntries()
+ {
+ QueueEntry current = _queueEntry;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry2, current);
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+
+ }
+
+ public void testTraverseWithDeletedEntries()
+ {
+ // Delete 2nd queue entry
+ _queueEntry2.delete();
+ assertTrue(_queueEntry2.isDeleted());
+
+ QueueEntry current = _queueEntry;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index 7136f07ca5..f3ba6a5495 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -22,21 +22,28 @@ package org.apache.qpid.server.queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
-import junit.framework.TestCase;
-
-public class SimpleQueueEntryListTest extends TestCase
+public class SimpleQueueEntryListTest extends QueueEntryListTestBase
{
+ private SimpleQueueEntryList _sqel;
+
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
String oldScavengeValue = null;
-
+
@Override
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
+ _sqel = new SimpleQueueEntryList(_testQueue);
+ for(int i = 1; i <= 100; i++)
+ {
+ final ServerMessage msg = new MockAMQMessage(i);
+ final QueueEntry bleh = _sqel.add(msg);
+ assertNotNull("QE should not have been null", bleh);
+ }
}
@Override
@@ -52,19 +59,28 @@ public class SimpleQueueEntryListTest extends TestCase
}
}
- /**
- * Tests the behavior of the next(QueuyEntry) method.
- */
- public void testNext() throws Exception
+ @Override
+ public QueueEntryList getTestList()
{
- SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
- int i = 0;
+ return _sqel;
+ }
+
+ @Override
+ public long getExpectedFirstMsgId()
+ {
+ return 1;
+ }
- QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++));
- QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++));
+ @Override
+ public int getExpectedListLength()
+ {
+ return 100;
+ }
- assertSame(queueEntry2, sqel.next(queueEntry1));
- assertNull(sqel.next(queueEntry2));
+ @Override
+ public AMQMessage getTestMessageToAdd() throws AMQException
+ {
+ return new MockAMQMessage(1l);
}
public void testScavenge() throws Exception
@@ -82,7 +98,7 @@ public class SimpleQueueEntryListTest extends TestCase
entriesMap.put(i,bleh);
}
- QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead());
+ SimpleQueueEntryImpl head = sqel.getHead();
//We shall now delete some specific messages mid-queue that will lead to
//requiring a scavenge once the requested threshold of 9 deletes is passed
@@ -99,11 +115,10 @@ public class SimpleQueueEntryListTest extends TestCase
assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete());
verifyDeletedButPresentBeforeScavenge(head, 14);
-
//Delete message 20 only
assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete());
verifyDeletedButPresentBeforeScavenge(head, 20);
-
+
//Delete messages 81 to 84
assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete());
verifyDeletedButPresentBeforeScavenge(head, 81);
@@ -113,35 +128,35 @@ public class SimpleQueueEntryListTest extends TestCase
verifyDeletedButPresentBeforeScavenge(head, 83);
assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete());
verifyDeletedButPresentBeforeScavenge(head, 84);
-
+
//Delete message 99 - this is the 10th message deleted that is after the queue head
//and so will invoke the scavenge() which is set to go after 9 previous deletions
assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete());
verifyAllDeletedMessagedNotPresent(head, entriesMap);
}
-
- private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId)
+
+ private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
{
//Use the head to get the initial entry in the queue
- QueueEntryImpl entry = head._next;
-
+ SimpleQueueEntryImpl entry = head.getNextNode();
+
for(long i = 1; i < messageId ; i++)
{
assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber());
- entry = entry._next;
+ entry = entry.getNextNode();
}
-
+
assertTrue("Entry should have been deleted", entry.isDeleted());
}
-
- private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
+
+ private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
{
//Use the head to get the initial entry in the queue
- QueueEntryImpl entry = head._next;
-
+ SimpleQueueEntryImpl entry = head.getNextNode();
+
assertNotNull("Initial entry should not have been null", entry);
-
+
int count = 0;
while (entry != null)
@@ -149,62 +164,56 @@ public class SimpleQueueEntryListTest extends TestCase
assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
assertNotNull("QueueEntry was not found in the list of remaining entries",
remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
-
+
count++;
- entry = entry._next;
+ entry = entry.getNextNode();
}
-
+
assertEquals("Count should have been equal",count,remainingMessages.size());
}
- public void testDequedMessagedNotPresentInIterator()
+ public void testGettingNextElement()
{
- int numberOfMessages = 10;
- SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- QueueEntry[] entries = new QueueEntry[numberOfMessages];
+ final int numberOfEntries = 5;
+ final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries];
+ final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
- for(int i = 0; i < numberOfMessages ; i++)
+ // create test entries
+ for(int i = 0; i < numberOfEntries; i++)
{
- AMQMessage message = null;;
- try
- {
- message = new MockAMQMessage(i);
- }
- catch (AMQException e)
- {
- fail("Failure to create a mock message:" + e.getMessage());
- }
- QueueEntry entry = entryList.add(message);
- assertNotNull("QE should not be null", entry);
- entries[i]= entry;
+ AMQMessage message = new MockAMQMessage(i);
+ entries[i] = queueEntryList.add(message);
}
- // dequeue all even messages
- for (QueueEntry queueEntry : entries)
+ // test getNext for not acquired entries
+ for(int i = 0; i < numberOfEntries; i++)
{
- long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue();
- if (i%2 == 0)
+ final SimpleQueueEntryImpl next = entries[i].getNextValidEntry();
+
+ if(i < numberOfEntries - 1)
+ {
+ assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
+ }
+ else
{
- queueEntry.acquire();
- queueEntry.dequeue();
+ assertNull("The next entry after the last should be null", next);
}
}
- // iterate and check that dequeued messages are not returned by iterator
- QueueEntryIterator it = entryList.iterator();
- int counter = 0;
- int i = 1;
- while (it.advance())
- {
- QueueEntry entry = it.getNode();
- Long id = ((AMQMessage)entry.getMessage()).getMessageId();
- assertEquals("Expected message with id " + i + " but got message with id "
- + id, new Long(i), id);
- counter++;
- i += 2;
- }
- int expectedNumber = numberOfMessages / 2;
- assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter,
- expectedNumber, counter);
+ // delete second
+ entries[1].acquire();
+ entries[1].delete();
+
+ // dequeue third
+ entries[2].acquire();
+ entries[2].dequeue();
+
+ SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
+ assertEquals("expected forth entry", entries[3], next);
+ next = next.getNextValidEntry();
+ assertEquals("expected fifth entry", entries[4], next);
+ next = next.getNextValidEntry();
+ assertNull("The next entry after the last should be null", next);
}
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
new file mode 100644
index 0000000000..43fb5b4cb3
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SortedQueueEntryImplTest extends QueueEntryImplTestBase {
+
+ public final static String keys[] = { "CCC", "AAA", "BBB" };
+
+ private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
+
+ public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException {
+ final ServerMessage message = new MockAMQMessage(msgId, "KEY", keys[msgId-1]);
+ return queueEntryList.add(message);
+ }
+
+ public void testCompareTo()
+ {
+ assertTrue(_queueEntry.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry.compareTo(_queueEntry3) > 0);
+
+ assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry) < 0);
+
+ assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry) < 0);
+
+ assertTrue(_queueEntry.compareTo(_queueEntry) == 0);
+ assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0);
+ assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0);
+ }
+
+ public void testTraverseWithNoDeletedEntries()
+ {
+ QueueEntry current = _queueEntry2;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry3, current);
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+
+ }
+
+ public void testTraverseWithDeletedEntries()
+ {
+ // Delete 2nd queue entry
+ _queueEntry3.delete();
+ assertTrue(_queueEntry3.isDeleted());
+
+ QueueEntry current = _queueEntry2;
+
+ current = current.getNextValidEntry();
+ assertSame("Unexpected current entry",_queueEntry, current);
+
+ current = current.getNextValidEntry();
+ assertNull(current);
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
new file mode 100644
index 0000000000..eca845644e
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -0,0 +1,323 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.AMQMessage;
+
+import java.util.Arrays;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SortedQueueEntryListTest extends QueueEntryListTestBase
+{
+ private static SelfValidatingSortedQueueEntryList _sqel;
+
+ public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
+ " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
+ "195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45",
+ " 64", "100", " 83", " 51", " 79", " 82", "180", " 26", " 61", " 62",
+ " 78", " 46", "147", " 91", "120", "164", " 92", "172", "188", " 50",
+ "111", " 89", " 4", " 8", " 16", "151", "122", "178", " 33", "124",
+ "171", "165", "116", "113", "155", "148", " 29", " 0", " 37", "131",
+ "146", " 57", "112", " 97", " 23", "108", "123", "117", "167", " 52",
+ " 98", " 6", "160", " 25", " 49", " 34", "182", "185", " 30", " 66",
+ "152", " 58", " 86", "118", "189", " 84", " 36", "104", " 7", " 76",
+ " 87", " 1", " 80", " 10", "142", " 59", "137", " 12", " 67", " 22",
+ " 9", "106", " 75", "109", " 93", " 42", "177", "134", " 77", " 88",
+ "114", " 43", "143", "135", " 55", "181", " 32", "174", "175", "184",
+ "133", "107", " 28", "126", "103", " 85", " 38", "158", " 39", "162",
+ "129", "194", " 15", " 24", " 19", " 35", "186", " 31", " 65", " 99",
+ "192", " 74", "156", " 27", " 95", " 54", " 70", " 13", "110", " 41",
+ " 90", "173", "125", "196", "130", "183", "102", "190", "132", "105",
+ " 21", " 53", "139", " 94", "115", " 48", " 44", "179", "128", " 14",
+ " 72", "119", "153", "168", "197", " 40", "150", "138", " 5", "154",
+ "169", " 71", "199", "198", "170", " 3", "121", " 20", " 63", "193" };
+
+ public final static String textkeys[] = { "AAA", "BBB", "CCC", "DDD", "EEE", "FFF", "GGG", "HHH", "III", "JJJ",
+ "KKK", "LLL", "MMM", "NNN", "OOO", "PPP", "QQQ", "RRR", "SSS", "TTT",
+ "UUU", "VVV", "XXX", "YYY", "ZZZ"};
+
+ private final static String keysSorted[] = keys.clone();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ // Create result array
+ Arrays.sort(keysSorted);
+
+ // Create test list
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(final String key : keys)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, key);
+ _sqel.add(msg);
+ }
+
+ }
+
+ public QueueEntryList getTestList()
+ {
+ return _sqel;
+ }
+
+ public int getExpectedListLength()
+ {
+ return keys.length;
+ }
+
+ public long getExpectedFirstMsgId()
+ {
+ return 67L;
+ }
+
+ public ServerMessage getTestMessageToAdd() throws AMQException
+ {
+ return generateTestMessage(1, "test value");
+ }
+
+ private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
+ {
+ return new AMQMessage(new MockStoredMessage(id, "KEY", keyValue));
+ }
+
+ public void testIterator()
+ {
+ super.testIterator();
+
+ // Test sorted order of list
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count = 0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value does not match sorted key array",
+ keysSorted[count++], getSortedKeyValue(iter));
+ }
+ }
+
+ private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+ {
+ return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
+ }
+
+ private Long getMessageId(QueueEntryIterator<?> iter)
+ {
+ return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageNumber();
+ }
+
+ public void testNonUniqueSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ while(messageId < 200)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, "samekey");
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", "samekey", getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+ }
+ }
+
+ public void testNullSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ while(messageId < 200)
+ {
+ final ServerMessage msg = generateTestMessage(messageId++, null);
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); }
+ }
+
+ public void testAscendingSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(String textKey : textkeys)
+ {
+ final ServerMessage msg = generateTestMessage(messageId, textKey);
+ messageId++;
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(count), getMessageId(iter));
+ count++;
+ }
+ }
+
+ public void testDescendingSortKeys() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ // Build test list
+ long messageId = 0L;
+ for(int i=textkeys.length-1; i >=0; i--)
+ {
+ final ServerMessage msg = generateTestMessage(messageId, textkeys[i]);
+ messageId++;
+ _sqel.add(msg);
+ }
+
+ final QueueEntryIterator<?> iter = getTestList().iterator();
+ int count=0;
+ while(iter.advance())
+ {
+ assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter));
+ assertEquals("Message id not as expected", Long.valueOf(textkeys.length-count-1), getMessageId(iter));
+ count++;
+ }
+ }
+
+ public void testInsertAfter() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "A");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ msg = generateTestMessage(2, "B");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 2);
+ }
+
+ public void testInsertBefore() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "B");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ msg = generateTestMessage(2, "A");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 2);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 1);
+ }
+
+ public void testInsertInbetween() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "A");
+ _sqel.add(msg);
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ msg = generateTestMessage(2, "C");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 2);
+
+ msg = generateTestMessage(3, "B");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 2);
+ }
+
+ public void testInsertAtHead() throws Exception
+ {
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+
+ ServerMessage msg = generateTestMessage(1, "B");
+ _sqel.add(msg);
+
+ SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ msg = generateTestMessage(2, "D");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+
+ msg = generateTestMessage(3, "C");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+
+ msg = generateTestMessage(4, "A");
+ _sqel.add(msg);
+
+ entry = _sqel.next(_sqel.getHead());
+ validateEntry(entry, "A", 4);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "B", 1);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "C", 3);
+
+ entry = _sqel.next(entry);
+ validateEntry(entry, "D", 2);
+ }
+
+ private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+ {
+ assertEquals("Sorted queue entry value is not as expected",
+ expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
+ assertEquals("Sorted queue entry id is not as expected",
+ Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber());
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3acd064fd7..1d0a9d6316 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -689,7 +689,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (usePriority)
{
queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
}
if (lastValueQueue)
@@ -767,7 +767,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
@@ -781,7 +781,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index 2ce1251eab..962aec0d1e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -20,35 +20,26 @@
*/
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
-import junit.framework.Assert;
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import javax.jms.*;
-import javax.naming.NamingException;
-import javax.naming.Context;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
-public class PriorityTest extends QpidBrokerTestCase
+public class PriorityQueueTest extends QpidBrokerTestCase
{
private static final int TIMEOUT = 1500;
-
- private static final Logger _logger = Logger.getLogger(PriorityTest.class);
-
protected final String QUEUE = "PriorityQueue";
private static final int MSG_COUNT = 50;
@@ -60,9 +51,8 @@ public class PriorityTest extends QpidBrokerTestCase
private Connection consumerConnection;
private Session consumerSession;
-
private MessageConsumer consumer;
-
+
protected void setUp() throws Exception
{
super.setUp();
@@ -71,10 +61,10 @@ public class PriorityTest extends QpidBrokerTestCase
producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
-
+
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
}
protected void tearDown() throws Exception
@@ -111,7 +101,7 @@ public class PriorityTest extends QpidBrokerTestCase
Message previous = null;
int messageCount = 0;
while((received = consumer.receive(1000))!=null)
- {
+ {
messageCount++;
if(previous != null)
{
@@ -124,17 +114,17 @@ public class PriorityTest extends QpidBrokerTestCase
assertEquals("Incorrect number of message received", 50, receivedCount);
}
-
+
public void testOddOrdering() throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-priorities",3);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
-
+
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
-
+
// In order ABC
producer.setPriority(9);
producer.send(nextMessage(1, false, producerSession, producer));
@@ -151,14 +141,14 @@ public class PriorityTest extends QpidBrokerTestCase
producer.setPriority(1);
producer.send(nextMessage(6, false, producerSession, producer));
- // Out of order BCA
+ // Out of order BCA
producer.setPriority(4);
producer.send(nextMessage(7, false, producerSession, producer));
producer.setPriority(1);
producer.send(nextMessage(8, false, producerSession, producer));
producer.setPriority(9);
producer.send(nextMessage(9, false, producerSession, producer));
-
+
// Reverse order CBA
producer.setPriority(1);
producer.send(nextMessage(10, false, producerSession, producer));
@@ -167,10 +157,10 @@ public class PriorityTest extends QpidBrokerTestCase
producer.setPriority(9);
producer.send(nextMessage(12, false, producerSession, producer));
producerSession.commit();
-
+
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
-
+
Message msg = consumer.receive(TIMEOUT);
assertEquals(1, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
@@ -179,7 +169,7 @@ public class PriorityTest extends QpidBrokerTestCase
assertEquals(9, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(12, msg.getIntProperty("msg"));
-
+
msg = consumer.receive(TIMEOUT);
assertEquals(2, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
@@ -188,7 +178,7 @@ public class PriorityTest extends QpidBrokerTestCase
assertEquals(7, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
assertEquals(11, msg.getIntProperty("msg"));
-
+
msg = consumer.receive(TIMEOUT);
assertEquals(3, msg.getIntProperty("msg"));
msg = consumer.receive(TIMEOUT);
@@ -206,6 +196,4 @@ public class PriorityTest extends QpidBrokerTestCase
return send;
}
-
-
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
new file mode 100644
index 0000000000..83046b84a5
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.NamingException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class SortedQueueTest extends QpidBrokerTestCase
+{
+ private static final Logger LOGGER = Logger.getLogger(SortedQueueTest.class);
+ public static final String TEST_SORT_KEY = "testSortKey";
+ private static final String VALUES[] = SortedQueueEntryListTest.keys.clone();
+ private static final String VALUES_SORTED[] = SortedQueueEntryListTest.keys.clone();
+ public final static String SUBSET_KEYS[] = { "000", "100", "200", "300", "400", "500", "600", "700", "800", "900" };
+
+ private Connection _producerConnection;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1");
+ // Sort value array to generated "expected" order of messages.
+ Arrays.sort(VALUES_SORTED);
+ _producerConnection = getConnection();
+ _consumerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _producerSession.close();
+ _producerConnection.close();
+ _consumerConnection.close();
+ super.tearDown();
+ }
+
+ public void testSortOrder() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ for(String value : VALUES)
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text:" + value);
+ msg.setStringProperty(TEST_SORT_KEY, value);
+ producer.send(msg);
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received;
+ int messageCount = 0;
+ while((received = (TextMessage) consumer.receive(1000)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount],
+ received.getStringProperty(TEST_SORT_KEY));
+ assertEquals("Received message with unexpected message value",
+ "Message Text:" + VALUES_SORTED[messageCount], received.getText());
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", VALUES.length, messageCount);
+ }
+
+ public void testAutoAckSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testTransactedSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.SESSION_TRANSACTED);
+ }
+
+ public void testClientAckSortedQueue() throws JMSException, NamingException, AMQException
+ {
+ runThroughSortedQueueForSessionMode(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ private void runThroughSortedQueueForSessionMode(final int sessionMode) throws JMSException, NamingException,
+ AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue);
+ consumerThread.start();
+ final Calendar cal = Calendar.getInstance(Locale.UK);
+
+ for(String value : VALUES)
+ {
+ final Message msg = _producerSession.createTextMessage(String.valueOf(cal.getTimeInMillis()));
+ msg.setStringProperty(TEST_SORT_KEY, value);
+ producer.send(msg);
+ _producerSession.commit();
+ }
+
+ synchronized(consumerThread)
+ {
+ try
+ {
+ consumerThread.join(5000L);
+ }
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+ }
+ assertTrue("Consumer timed out", consumerThread.isStopped());
+ assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed());
+
+ producer.close();
+ }
+
+ public void testSortedQueueWithAscendingSortedKeys() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ final TestConsumerThread consumerThread = new TestConsumerThread(Session.AUTO_ACKNOWLEDGE, queue);
+ consumerThread.start();
+
+ for(int i = 0; i < 200; i++)
+ {
+ final String ascendingKey = AscendingSortedKeys.getNextKey();
+ final Message msg = _producerSession.createTextMessage("Message Text:" + ascendingKey);
+ msg.setStringProperty(TEST_SORT_KEY, ascendingKey);
+ producer.send(msg);
+ _producerSession.commit();
+ }
+
+ synchronized(consumerThread)
+ {
+ try
+ {
+ consumerThread.join(5000L);
+ }
+ catch(InterruptedException e)
+ {
+ fail("Test failed waiting for consumer to complete");
+ }
+ }
+ assertTrue("Consumer timed out", consumerThread.isStopped());
+ assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed());
+
+ producer.close();
+ }
+
+ public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ int count = 0;
+ while(count < 200)
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text:" + count++);
+ msg.setStringProperty(TEST_SORT_KEY, "samesortkeyvalue");
+ producer.send(msg);
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received = null;
+ int messageCount = 0;
+
+ while((received = (TextMessage) consumer.receive(1000)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue",
+ received.getStringProperty(TEST_SORT_KEY));
+ assertEquals("Received message with unexpected message value", "Message Text:" + messageCount,
+ received.getText());
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", 200, messageCount);
+ }
+
+ public void testSortOrderWithUniqueKeySubset() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ int count = 0;
+ while(count < 100)
+ {
+ int keyValueIndex = count % 10;
+ final Message msg = _producerSession.createTextMessage("Message Text:" + count);
+ msg.setStringProperty(TEST_SORT_KEY, SUBSET_KEYS[keyValueIndex]);
+ producer.send(msg);
+ count++;
+ }
+
+ _producerSession.commit();
+ producer.close();
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ TextMessage received;
+ int messageCount = 0;
+
+ while((received = (TextMessage) consumer.receive(1000)) != null)
+ {
+ assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10],
+ received.getStringProperty(TEST_SORT_KEY));
+ messageCount++;
+ }
+
+ assertEquals("Incorrect number of messages received", 100, messageCount);
+ }
+
+ public void testGetNext() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ sendAndCommitMessage(producer,"2");
+ sendAndCommitMessage(producer,"3");
+ sendAndCommitMessage(producer,"1");
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ receiveAndValidateMessage(consumer, "1");
+ receiveAndValidateMessage(consumer, "2");
+ receiveAndValidateMessage(consumer, "3");
+
+ sendAndCommitMessage(producer,"4");
+
+ receiveAndValidateMessage(consumer, "4");
+
+ sendAndCommitMessage(producer,"7");
+ sendAndCommitMessage(producer,"6");
+ sendAndCommitMessage(producer,"5");
+
+ // pre-fetch causes "unexpected" order
+ receiveAndValidateMessage(consumer, "7");
+ receiveAndValidateMessage(consumer, "5");
+ receiveAndValidateMessage(consumer, "6");
+ }
+
+ public void testGetNextPreFetch() throws JMSException, NamingException, AMQException
+ {
+ final Queue queue = createQueue();
+ final MessageProducer producer = _producerSession.createProducer(queue);
+
+ sendAndCommitMessage(producer,"1");
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(queue);
+ _consumerConnection.start();
+ receiveAndValidateMessage(consumer, "1");
+
+ producer.send(getSortableTestMesssage("4"));
+ producer.send(getSortableTestMesssage("3"));
+ producer.send(getSortableTestMesssage("2"));
+ _producerSession.commit();
+
+ // pre-fetch causes "unexpected" order
+ receiveAndValidateMessage(consumer, "4");
+ receiveAndValidateMessage(consumer, "2");
+ receiveAndValidateMessage(consumer, "3");
+ }
+
+ public void testGetNextWithAck() throws JMSException, NamingException, AMQException
+ {
+ Queue _queue = createQueue();
+ MessageProducer producer = _producerSession.createProducer(_queue);
+ Message received = null;
+
+ //Send 3 out of order
+ sendAndCommitMessage(producer,"2");
+ sendAndCommitMessage(producer,"3");
+ sendAndCommitMessage(producer,"1");
+
+ final Session consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ final MessageConsumer consumer = consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ //Receive 3 in sorted order
+ received = receiveAndValidateMessage(consumer, "1");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "2");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "3");
+ received.acknowledge();
+
+ //Send 1
+ sendAndCommitMessage(producer,"4");
+
+ //Receive 1 and recover
+ received = receiveAndValidateMessage(consumer, "4");
+ consumerSession.recover();
+
+ //Receive same 1
+ received = receiveAndValidateMessage(consumer, "4");
+ received.acknowledge();
+
+ //Send 3 out of order
+ sendAndCommitMessage(producer,"7");
+ sendAndCommitMessage(producer,"6");
+ sendAndCommitMessage(producer,"5");
+
+ //Receive 1 of 3 (out of order due to pre-fetch) and recover
+ received = receiveAndValidateMessage(consumer, "7");
+ consumerSession.recover();
+
+ if (isBroker010())
+ {
+ //Receive 3 in sorted order (not as per JMS recover)
+ received = receiveAndValidateMessage(consumer, "5");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "6");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "7");
+ received.acknowledge();
+ }
+ else
+ {
+ //Receive 3 in partial sorted order due to recover
+ received = receiveAndValidateMessage(consumer, "7");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "5");
+ received.acknowledge();
+ received = receiveAndValidateMessage(consumer, "6");
+ received.acknowledge();
+ }
+ }
+
+ protected Queue createQueue() throws AMQException, JMSException
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY);
+ ((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments);
+ final Queue queue = new AMQQueue("amq.direct", getTestQueueName());
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+
+ private Message getSortableTestMesssage(final String key) throws JMSException
+ {
+ final Message msg = _producerSession.createTextMessage("Message Text: Key Value" + key);
+ msg.setStringProperty(TEST_SORT_KEY, key);
+ return msg;
+ }
+
+ private void sendAndCommitMessage(final MessageProducer producer, final String keyValue) throws JMSException
+ {
+ producer.send(getSortableTestMesssage(keyValue));
+ _producerSession.commit();
+ }
+
+ private Message receiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException
+ {
+ final Message received = (TextMessage) consumer.receive(10000);
+ assertNotNull("Received message is unexpectedly null", received);
+ assertEquals("Received message with unexpected sorted key value", expectedKey,
+ received.getStringProperty(TEST_SORT_KEY));
+ return received;
+ }
+
+ private class TestConsumerThread extends Thread
+ {
+ private boolean _stopped = false;
+ private int _count = 0;
+ private int _consumed = 0;
+ private int _sessionType = Session.AUTO_ACKNOWLEDGE;
+ private Queue _queue;
+
+ public TestConsumerThread(final int sessionType, final Queue queue)
+ {
+ _sessionType = sessionType;
+ _queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Connection conn = null;
+ try
+ {
+ conn = getConnection();
+ }
+ catch(Exception e)
+ {
+ fail("Could not get connection");
+ }
+
+ final Session session = conn.createSession((_sessionType == Session.SESSION_TRANSACTED ? true : false),
+ _sessionType);
+ final MessageConsumer consumer = session.createConsumer(_queue);
+
+ conn.start();
+
+ TextMessage msg;
+ Calendar cal = Calendar.getInstance(Locale.UK);
+ while((msg = (TextMessage) consumer.receive(1000)) != null)
+ {
+ if(_sessionType == Session.SESSION_TRANSACTED)
+ {
+ if (_count%10 == 0)
+ {
+ LOGGER.debug("transacted session rollback");
+ session.rollback();
+ }
+ else
+ {
+ LOGGER.debug("transacted session commit");
+ session.commit();
+ _consumed++;
+ }
+ }
+ else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (_count%10 == 0)
+ {
+ LOGGER.debug("client ack session recover");
+ session.recover();
+ }
+ else
+ {
+ LOGGER.debug("client ack session acknowledge");
+ msg.acknowledge();
+ _consumed++;
+ }
+ }
+ else
+ {
+ LOGGER.debug("auto ack session");
+ _consumed++;
+ }
+
+ _count++;
+ LOGGER.debug("Message consumed at : " + cal.getTimeInMillis());
+ LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY));
+ LOGGER.debug("Message consumed with text: " + msg.getText());
+ LOGGER.debug("Message consumed with consumed index: " + _consumed);
+ }
+
+ _stopped = true;
+ session.close();
+ conn.close();
+ }
+ catch(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public synchronized boolean isStopped()
+ {
+ return _stopped;
+ }
+
+ public synchronized int getConsumed()
+ {
+ return _consumed;
+ }
+ }
+
+ private static class AscendingSortedKeys
+ {
+ public static final String[] KEYS = { "Ul4a1", "WaWsv", "2Yz7E", "ix74r", "okgRi", "HlUbF", "LewvM", "lweGy",
+ "TXQ0Z", "0Kyfs", "s7Mxk", "dmoS7", "8RCUA", "W3VFH", "aez9y", "uQIcz", "0h1b1", "cmXIX",
+ "4dEz6", "zHF1q", "D6rBy", "5drc6", "0BmCy", "BCxeC", "t59lR", "aL6AJ", "OHaBz", "WmadA",
+ "B3qem", "CxVEf", "AIYUu", "uJScX", "uoStw", "ogLgc", "AgJHQ", "hUTw7", "Rxrsm", "9GXkX",
+ "7hyVv", "y94nw", "Twano", "TCgPp", "pFrrl", "POUYS", "L7cGc", "0ao3l", "CNHmv", "MaJQs",
+ "OUqFM", "jeskS", "FPfSE", "v1Hln", "14FLR", "KZamH", "G1RhS", "FVMxo", "rKDLJ", "nnP8o",
+ "nFqik", "zmLYD", "1j5L8", "e6e4z", "WDVWJ", "aDGtS", "fcwDa", "nlaBy", "JJs5m", "vLsmS",
+ "No0Qb", "JXljW", "Waim6", "MezSW", "l83Ud", "SjskQ", "uPX7G", "5nmWv", "ZhwG1", "uTacx",
+ "t98iW", "JkzUn", "fmIK1", "i7WMQ", "bgJAz", "n1pmO", "jS1aj", "4W0Tl", "Yf2Ec", "sqVrf",
+ "MojnP", "qQxHP", "pWiOs", "yToGW", "kB5nP", "BpYhV", "Cfgr3", "zbIYY", "VLTy6", "he9IA",
+ "lm0pD", "WreyP", "8hJdt", "QnJ1S", "n8pJ9", "iqv4k", "OUYuF", "8cVD3", "sx5Gl", "cQOnv",
+ "wiHrZ", "oGu6x", "7fsYM", "gf8rI", "7fKYU", "pT8wu", "lCMxy", "prNT6", "5Drn0", "guMb8",
+ "OxWIH", "uZPqg", "SbRYy", "In3NS", "uvf7A", "FLsph", "pmeCd", "BbwgA", "ru4UG", "YOfrY",
+ "W7cTs", "K4GS8", "AOgEe", "618Di", "dpe1v", "3otm6", "oVQp6", "5Mg9r", "Y1mC0", "VIlwP",
+ "aFFss", "Mkgy8", "pv0i7", "S77LH", "XyPZN", "QYxC0", "vkCHH", "MGlTF", "24ARF", "v2eC3",
+ "ZUnqt", "HfyNQ", "FjHXR", "45cIH", "1LB1L", "zqH0W", "fLNg8", "oQ87r", "Cp3mZ", "Zv7z0",
+ "O3iyQ", "EOE1o", "5ZaEz", "tlILt", "MmsIo", "lXFOB", "gtCA5", "yEfy9", "7X3uy", "d7vjM",
+ "XflUq", "Fhtgl", "NOHsz", "GWqqX", "xciqp", "BFkb8", "P6bcg", "lViBv", "2TRI7", "2hEEU",
+ "9XyT9", "29QAz", "U3yw5", "FxX9q", "C2Irc", "8U2nU", "m4bxU", "5iGN5", "mX2GE", "cShY2",
+ "JRJQB", "yvOMI", "4QMc9", "NAFuw", "RmDcr", "faHir", "2ZHdk", "zY1GY", "a00b5", "ZuDtD",
+ "JIqXi", "K20wK", "gdQsS", "5Namm", "lkMUA", "IBe8k", "FcWrW", "FFDui", "tuDyS", "ZJTXH",
+ "AkKTk", "zQt6Q", "FNYIM", "RpBQm", "RsQUq", "Mm8si", "gjUTu", "zz4ZU", "jiVBP", "ReKEW",
+ "5VZjS", "YjB9t", "zFgtB", "8TxD7", "euZA5", "MK07Y", "CK5W7", "16lHc", "6q6L9", "Z4I1v",
+ "UlU3M", "SWfou", "0PktI", "55rfB", "jfREu", "580YD", "Uvlv4", "KASQ8", "AmdQd", "piJSk",
+ "hE1Ql", "LDk6f", "NcICA", "IKxdL", "iwzGk", "uN6r3", "lsQGo", "QClRL", "iKqhr", "FGzgp",
+ "RkQke", "b29RJ", "CIShG", "9eoRc", "F6PT2", "LbRTH", "M3zXL", "GXdoH", "IjTwP", "RBhp0",
+ "yluBx", "mz8gx", "MmKGJ", "Q6Lix", "uupzk", "RACuj", "d85a9", "qaofN", "kZANm", "jtn0X",
+ "lpF6W", "suY4x", "rz7Ut", "wDajX", "1v5hH", "Yw2oU", "ksJby", "WMiS3", "lj07Q", "EdBKc",
+ "6AFT0", "0YAGH", "ThjNn", "JKWYR", "9iGoT", "UmaEv", "3weIF", "CdyBV", "pAhR1", "djsrv",
+ "xReec", "8FmFH", "Dz1R3", "Ta8f6", "DG4sT", "VjCZq", "jSjS3", "Pb1pa", "VNCPd", "Kr8ys",
+ "AXpwE", "ZzJHW", "Nxx9V", "jzUqR", "dhSuH", "DQimp", "07n1c", "HP433", "RzaZA", "cL0aE",
+ "Ss0Zu", "FnPFB", "7lUXZ", "9rlg9", "lH1kt", "ni2v1", "48cHL", "soy9t", "WPmlx", "2Nslm",
+ "hSSvQ", "9y4lw", "ulk41", "ECMvU", "DLhzM", "GrDg7", "x3LDe", "QChxs", "xXTI4", "Gv3Fq",
+ "rhl0J", "QssNC", "brhlQ", "s93Ml", "tl72W", "pvgjS", "Qworu", "DcpWB", "X6Pin", "J2mQi",
+ "BGaQY", "CqqaD", "NhXdu", "dQ586", "Yh1hF", "HRxd8", "PYBf4", "64s8N", "tvdkD", "azIWp",
+ "tAOsr", "v8yFN", "h1zcH", "SmGzv", "bZLvS", "fFDrJ", "Oz8yZ", "0Wr5y", "fcJOy", "7ku1p",
+ "QbxXc", "VerEA", "QWxoT", "hYBCK", "o8Uyd", "FwEJz", "hi5X7", "uAWyp", "I7p2a", "M6qcG",
+ "gIYvE", "HzZT8", "iB08l", "StlDJ", "tjQxs", "k85Ae", "taOXK", "s4786", "2DREs", "atef2",
+ "Vprf2", "VBjhz", "EoToP", "blLA9", "qUJMd", "ydG8U", "8xEKz", "uLtKs", "GSQwj", "S2Dfu",
+ "ciuWz", "i3pyd", "7Ow5C", "IRh48", "vOqCE", "Q6hMC", "yofH3", "KsjRK", "5IhmG", "fqypy",
+ "0MR5X", "Chuy3" };
+
+ private static int _i = 0;
+ private static int _j = 0;
+
+ static
+ {
+ Arrays.sort(KEYS);
+ }
+
+ public static String getNextKey()
+ {
+ if(_j == KEYS.length)
+ {
+ _j = 0;
+ _i++;
+ if(_i == KEYS.length)
+ {
+ _i = 0;
+ }
+ }
+ return new StringBuffer().append(KEYS[_i]).append("-").append(KEYS[_j++]).toString();
+ }
+ }
+}
diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes
index 68feaf1e2b..057d7c2c44 100644
--- a/java/test-profiles/JavaPre010Excludes
+++ b/java/test-profiles/JavaPre010Excludes
@@ -33,6 +33,7 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties
+org.apache.qpid.server.queue.AddressBasedSortedQueueTest#*
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*