summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
committerKeith Wall <kwall@apache.org>2011-11-20 23:37:48 +0000
commit26df8c6fd2959210deab55d0c57a2f98ef7d6542 (patch)
tree9e378550276888ddd766085bad93994f3f892f17 /qpid/java/broker/src/main
parent10a2d3074f254794ae8443994e60b0e4e8c0b56a (diff)
downloadqpid-python-26df8c6fd2959210deab55d0c57a2f98ef7d6542.tar.gz
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java53
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java65
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java71
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java83
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java143
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java665
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java13
17 files changed, 1126 insertions, 207 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 4512de6fb4..31c683b548 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -62,7 +62,8 @@ public class QueueConfiguration extends ConfigurationPlugin
"capacity",
"flowResumeCapacity",
"lvq",
- "lvqKey"
+ "lvqKey",
+ "sortKey"
};
}
@@ -167,6 +168,10 @@ public class QueueConfiguration extends ConfigurationPlugin
return getStringValue("lvqKey", null);
}
+ public String getQueueSortKey()
+ {
+ return getStringValue("sortKey", null);
+ }
public static class QueueConfig extends ConfigurationPlugin
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 371ae0de50..2c04a626ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -20,71 +20,25 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
import java.util.Map;
+import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQPriorityQueue extends SimpleAMQQueue
+public class AMQPriorityQueue extends OutOfOrderQueue
{
- protected AMQPriorityQueue(final AMQShortString name,
- final boolean durable,
- final AMQShortString owner,
- final boolean autoDelete,
- boolean exclusive,
- final VirtualHost virtualHost,
- int priorities, Map<String, Object> arguments)
- {
- super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments);
- }
-
- public AMQPriorityQueue(String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
+ protected AMQPriorityQueue(final String name,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ boolean exclusive,
+ final VirtualHost virtualHost,
+ Map<String, Object> arguments,
+ int priorities)
{
- this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),
- autoDelete, exclusive,virtualHost, priorities, arguments);
+ super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
}
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
}
-
- @Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
- {
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && entry.isAvailable())
- {
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
- {
- QueueContext context = (QueueContext) subscription.getQueueContext();
- if(context != null)
- {
- QueueEntry subnode = context._lastSeenEntry;
- QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
- {
- if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
- {
- break;
- }
- else
- {
- subnode = context._lastSeenEntry;
- released = context._releasedEntry;
- }
- }
- }
- }
-
- }
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index bee55118ba..e4a6f01930 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,22 +20,22 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.QueueConfiguration;
-
-import java.util.Map;
-import java.util.HashMap;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
- public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
private abstract static class QueueProperty
{
@@ -157,9 +157,11 @@ public class AMQQueueFactory
String description = "Permission denied: queue-name '" + queueName + "'";
throw new AMQSecurityException(description);
}
-
+
int priorities = 1;
String conflationKey = null;
+ String sortingKey = null;
+
if(arguments != null)
{
if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
@@ -170,24 +172,32 @@ public class AMQQueueFactory
conflationKey = QPID_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES.toString()))
+ else if(arguments.containsKey(X_QPID_PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString());
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
}
}
+ else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ {
+ sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ }
}
AMQQueue q;
- if(conflationKey != null)
+ if(sortingKey != null)
+ {
+ q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ }
+ else if(conflationKey != null)
{
q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments);
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
}
else
{
@@ -223,26 +233,22 @@ public class AMQQueueFactory
boolean exclusive = config.getExclusive();
String owner = config.getOwner();
Map<String,Object> arguments = null;
+
if(config.isLVQ() || config.getLVQKey() != null)
{
-
arguments = new HashMap<String,Object>();
arguments.put(QPID_LAST_VALUE_QUEUE, 1);
arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
}
- else
+ else if (config.getPriority() || config.getPriorities() > 0)
{
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if(priority || priorities > 0)
- {
- arguments = new HashMap<String,Object>();
- if (priorities < 0)
- {
- priorities = 10;
- }
- arguments.put("x-qpid-priorities", priorities);
- }
+ arguments = new HashMap<String,Object>();
+ arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ }
+ else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+ {
+ arguments = new HashMap<String,Object>();
+ arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
}
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 2c1883e763..c4762c98c9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -54,7 +54,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
@Override
- public QueueEntry add(final ServerMessage message)
+ public ConflationQueueEntry add(final ServerMessage message)
{
ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
AtomicReference<QueueEntry> latestValueReference = null;
@@ -117,7 +117,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
}
- private final class ConflationQueueEntry extends QueueEntryImpl
+ private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
@@ -158,7 +158,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
_conflationKey = conflationKey;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public ConflationQueueList createQueueEntryList(AMQQueue queue)
{
return new ConflationQueueList(queue, _conflationKey);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
new file mode 100644
index 0000000000..b16d1eb8e3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -0,0 +1,53 @@
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public abstract class OutOfOrderQueue extends SimpleAMQQueue
+{
+ protected OutOfOrderQueue(String name, boolean durable, String owner,
+ boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
+ QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ {
+ super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+ }
+
+ @Override
+ protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ {
+ // check that all subscriptions are not in advance of the entry
+ SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
+ while(subIter.advance() && !entry.isAcquired())
+ {
+ final Subscription subscription = subIter.getNode().getSubscription();
+ if(!subscription.isClosed())
+ {
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()
+ && (released == null || released.compareTo(entry) > 0))
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+
+ }
+ }
+ }
+
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 0c6b84d2b6..79d3ab5bd0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -20,21 +20,19 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
-public class PriorityQueueList implements QueueEntryList
+public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
private final AMQQueue _queue;
- private final QueueEntryList[] _priorityLists;
+ private final SimpleQueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
public PriorityQueueList(AMQQueue queue, int priorities)
{
_queue = queue;
- _priorityLists = new QueueEntryList[priorities];
+ _priorityLists = new SimpleQueueEntryList[priorities];
_priorities = priorities;
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
@@ -53,7 +51,7 @@ public class PriorityQueueList implements QueueEntryList
return _queue;
}
- public QueueEntry add(ServerMessage message)
+ public SimpleQueueEntryImpl add(ServerMessage message)
{
int index = message.getMessageHeader().getPriority() - _priorityOffset;
if(index >= _priorities)
@@ -68,31 +66,30 @@ public class PriorityQueueList implements QueueEntryList
}
- public QueueEntry next(QueueEntry node)
+ public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
{
- QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
- QueueEntry next = nodeImpl.getNext();
+ SimpleQueueEntryImpl next = node.getNextValidEntry();
if(next == null)
{
- QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
+ final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
int index;
for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
while(next == null && index != 0)
{
index--;
- next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+ next = _priorityLists[index].getHead().getNextValidEntry();
}
}
return next;
}
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl>
{
- private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
- private QueueEntry _lastNode;
+ private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ];
+ private SimpleQueueEntryImpl _lastNode;
PriorityQueueEntryListIterator()
{
@@ -116,7 +113,7 @@ public class PriorityQueueList implements QueueEntryList
return true;
}
- public QueueEntry getNode()
+ public SimpleQueueEntryImpl getNode()
{
return _lastNode;
}
@@ -135,16 +132,21 @@ public class PriorityQueueList implements QueueEntryList
}
}
- public QueueEntryIterator iterator()
+ public PriorityQueueEntryListIterator iterator()
{
return new PriorityQueueEntryListIterator();
}
- public QueueEntry getHead()
+ public SimpleQueueEntryImpl getHead()
{
return _priorityLists[_priorities-1].getHead();
}
+ public void entryDeleted(final SimpleQueueEntryImpl queueEntry)
+ {
+
+ }
+
static class Factory implements QueueEntryListFactory
{
private final int _priorities;
@@ -154,7 +156,7 @@ public class PriorityQueueList implements QueueEntryList
_priorities = priorities;
}
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public PriorityQueueList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueList(queue, _priorities);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index be29245901..c1fb0258fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -214,6 +214,10 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
boolean isQueueDeleted();
+ QueueEntry getNextNode();
+
+ QueueEntry getNextValidEntry();
+
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3d011b99c0..ee1d214c1f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -38,16 +38,11 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-public class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl implements QueueEntry
{
-
- /**
- * Used for debugging purposes.
- */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final SimpleQueueEntryList _queueEntryList;
+ private final QueueEntryList _queueEntryList;
private MessageReference _message;
@@ -80,22 +75,20 @@ public class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
- volatile QueueEntryImpl _next;
-
private static final int DELIVERED_TO_CONSUMER = 1;
private static final int REDELIVERED = 2;
private volatile int _deliveryState;
- QueueEntryImpl(SimpleQueueEntryList queueEntryList)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -104,7 +97,7 @@ public class QueueEntryImpl implements QueueEntry
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -316,16 +309,15 @@ public class QueueEntryImpl implements QueueEntry
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
-
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
public void reject()
@@ -497,33 +489,6 @@ public class QueueEntryImpl implements QueueEntry
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
- public QueueEntryImpl getNext()
- {
-
- QueueEntryImpl next = nextNode();
- while(next != null && next.isDispensed() )
- {
-
- final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
- {
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
- next = nextNode();
- }
- else
- {
- next = null;
- }
-
- }
- return next;
- }
-
- QueueEntryImpl nextNode()
- {
- return _next;
- }
-
public boolean isDeleted()
{
return _state == DELETED_STATE;
@@ -535,7 +500,7 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.entryDeleted(this);
return true;
}
else
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
index c5c115a2d1..73ebb0f300 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryIterator
+public interface QueueEntryIterator<QE extends QueueEntry>
{
boolean atTail();
- QueueEntry getNode();
+ QE getNode();
boolean advance();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index b4042ce02c..77c4b912e0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -22,15 +22,17 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList
+public interface QueueEntryList<Q extends QueueEntry>
{
AMQQueue getQueue();
- QueueEntry add(ServerMessage message);
+ Q add(ServerMessage message);
- QueueEntry next(QueueEntry node);
+ Q next(Q node);
- QueueEntryIterator iterator();
+ QueueEntryIterator<Q> iterator();
- QueueEntry getHead();
+ Q getHead();
+
+ void entryDeleted(Q queueEntry);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
new file mode 100644
index 0000000000..0707dc045c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryImpl extends QueueEntryImpl
+{
+ volatile SimpleQueueEntryImpl _next;
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+
+ public SimpleQueueEntryImpl getNextNode()
+ {
+ return _next;
+ }
+
+ public SimpleQueueEntryImpl getNextValidEntry()
+ {
+
+ SimpleQueueEntryImpl next = getNextNode();
+ while(next != null && next.isDispensed())
+ {
+
+ final SimpleQueueEntryImpl newNext = next.getNextNode();
+ if(newNext != null)
+ {
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ next = getNextNode();
+ }
+ else
+ {
+ next = null;
+ }
+
+ }
+ return next;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 46baab8c85..0bb5dcc219 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,10 +1,3 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.message.ServerMessage;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -25,25 +18,31 @@ import java.util.concurrent.atomic.AtomicLong;
* under the License.
*
*/
-public class SimpleQueueEntryList implements QueueEntryList
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.server.message.ServerMessage;
+
+public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
{
- private final QueueEntryImpl _head;
+ private final SimpleQueueEntryImpl _head;
- private volatile QueueEntryImpl _tail;
+ private volatile SimpleQueueEntryImpl _tail;
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl>
_tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail");
+ (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
private final AMQQueue _queue;
- static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
+ (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
@@ -52,14 +51,14 @@ public class SimpleQueueEntryList implements QueueEntryList
public SimpleQueueEntryList(AMQQueue queue)
{
_queue = queue;
- _head = new QueueEntryImpl(this);
+ _head = new SimpleQueueEntryImpl(this);
_tail = _head;
}
void advanceHead()
{
- QueueEntryImpl next = _head.nextNode();
- QueueEntryImpl newNext = _head.getNext();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
if (next == newNext)
{
@@ -73,11 +72,11 @@ public class SimpleQueueEntryList implements QueueEntryList
void scavenge()
{
- QueueEntryImpl next = _head.getNext();
+ SimpleQueueEntryImpl next = _head.getNextValidEntry();
while (next != null)
{
- next = next.getNext();
+ next = next.getNextValidEntry();
}
}
@@ -88,13 +87,13 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntry add(ServerMessage message)
+ public SimpleQueueEntryImpl add(ServerMessage message)
{
- QueueEntryImpl node = createQueueEntry(message);
+ SimpleQueueEntryImpl node = createQueueEntry(message);
for (;;)
{
- QueueEntryImpl tail = _tail;
- QueueEntryImpl next = tail.nextNode();
+ SimpleQueueEntryImpl tail = _tail;
+ SimpleQueueEntryImpl next = tail.getNextNode();
if (tail == _tail)
{
if (next == null)
@@ -115,23 +114,22 @@ public class SimpleQueueEntryList implements QueueEntryList
}
}
- protected QueueEntryImpl createQueueEntry(ServerMessage message)
+ protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message)
{
- return new QueueEntryImpl(this, message);
+ return new SimpleQueueEntryImpl(this, message);
}
- public QueueEntry next(QueueEntry node)
+ public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
{
- return ((QueueEntryImpl)node).getNext();
+ return node.getNextValidEntry();
}
-
- public static class QueueEntryIteratorImpl implements QueueEntryIterator
+ public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
{
- private QueueEntryImpl _lastNode;
+ private SimpleQueueEntryImpl _lastNode;
- QueueEntryIteratorImpl(QueueEntryImpl startNode)
+ QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
{
_lastNode = startNode;
}
@@ -139,14 +137,12 @@ public class SimpleQueueEntryList implements QueueEntryList
public boolean atTail()
{
- return _lastNode.nextNode() == null;
+ return _lastNode.getNextNode() == null;
}
- public QueueEntry getNode()
+ public SimpleQueueEntryImpl getNode()
{
-
return _lastNode;
-
}
public boolean advance()
@@ -154,10 +150,10 @@ public class SimpleQueueEntryList implements QueueEntryList
if(!atTail())
{
- QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDispensed() && nextNode.nextNode() != null)
+ SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
+ while(nextNode.isDispensed() && nextNode.getNextNode() != null)
{
- nextNode = nextNode.nextNode();
+ nextNode = nextNode.getNextNode();
}
_lastNode = nextNode;
return true;
@@ -173,21 +169,26 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntryIterator iterator()
+ public QueueEntryIteratorImpl iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- public QueueEntry getHead()
+ public SimpleQueueEntryImpl getHead()
{
return _head;
}
+ public void entryDeleted(SimpleQueueEntryImpl queueEntry)
+ {
+ advanceHead();
+ }
+
static class Factory implements QueueEntryListFactory
{
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ public SimpleQueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
new file mode 100644
index 0000000000..3f02442704
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class SortedQueue extends OutOfOrderQueue
+{
+ private String _sortedPropertyName;
+
+ protected SortedQueue(final String name, final boolean durable,
+ final String owner, final boolean autoDelete, final boolean exclusive,
+ final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+ {
+ super(name, durable, owner, autoDelete, exclusive, virtualHost,
+ new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+ this._sortedPropertyName = sortedPropertyName;
+ }
+
+ public String getSortedPropertyName()
+ {
+ return _sortedPropertyName;
+ }
+
+ public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ {
+ super.enqueue(message, action);
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java
new file mode 100644
index 0000000000..1052adbe67
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+/**
+ * An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
+ */
+public class SortedQueueEntryImpl extends QueueEntryImpl
+{
+ public static enum Colour
+ {
+ RED, BLACK
+ };
+
+ private volatile SortedQueueEntryImpl _next;
+ private SortedQueueEntryImpl _prev;
+ private String _key;
+
+ private Colour _colour = Colour.BLACK;
+ private SortedQueueEntryImpl _parent;
+ private SortedQueueEntryImpl _left;
+ private SortedQueueEntryImpl _right;
+
+ public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList,
+ final ServerMessage message, final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ @Override
+ public int compareTo(final QueueEntry o)
+ {
+ final String otherKey = ((SortedQueueEntryImpl) o)._key;
+ final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
+ return compare == 0 ? super.compareTo(o) : compare;
+ }
+
+ public Colour getColour()
+ {
+ return _colour;
+ }
+
+ public String getKey()
+ {
+ return _key;
+ }
+
+ public SortedQueueEntryImpl getLeft()
+ {
+ return _left;
+ }
+
+ public SortedQueueEntryImpl getNextNode()
+ {
+ return _next;
+ }
+
+ @Override
+ public SortedQueueEntryImpl getNextValidEntry()
+ {
+ return getNextNode();
+ }
+
+ public SortedQueueEntryImpl getParent()
+ {
+ return _parent;
+ }
+
+ public SortedQueueEntryImpl getPrev()
+ {
+ return _prev;
+ }
+
+ public SortedQueueEntryImpl getRight()
+ {
+ return _right;
+ }
+
+ public void setColour(final Colour colour)
+ {
+ _colour = colour;
+ }
+
+ public void setKey(final String key)
+ {
+ _key = key;
+ }
+
+ public void setLeft(final SortedQueueEntryImpl left)
+ {
+ _left = left;
+ }
+
+ public void setNext(final SortedQueueEntryImpl next)
+ {
+ _next = next;
+ }
+
+ public void setParent(final SortedQueueEntryImpl parent)
+ {
+ _parent = parent;
+ }
+
+ public void setPrev(final SortedQueueEntryImpl prev)
+ {
+ _prev = prev;
+ }
+
+ public void setRight(final SortedQueueEntryImpl right)
+ {
+ _right = right;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "(" + (_colour == Colour.RED ? "Red," : "Black,") + _key + ")";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
new file mode 100644
index 0000000000..5f8ab16c06
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -0,0 +1,665 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * A sorted implementation of QueueEntryList.
+ * Uses the red/black tree algorithm specified in "Introduction to Algorithms".
+ * ISBN-10: 0262033844
+ * ISBN-13: 978-0262033848
+ * @see http://en.wikipedia.org/wiki/Red-black_tree
+ */
+public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
+{
+ private final SortedQueueEntryImpl _head;
+ private SortedQueueEntryImpl _root;
+ private long _entryId = Long.MIN_VALUE;
+ private final Object _lock = new Object();
+ private final AMQQueue _queue;
+ private final String _propertyName;
+
+ public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+ {
+ _queue = queue;
+ _head = new SortedQueueEntryImpl(this);
+ _propertyName = propertyName;
+ }
+
+ @Override
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ @Override
+ public SortedQueueEntryImpl add(final ServerMessage message)
+ {
+ synchronized(_lock)
+ {
+ String key = null;
+ final Object val = message.getMessageHeader().getHeader(_propertyName);
+ if(val != null)
+ {
+ key = val.toString();
+ }
+
+ final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId);
+ entry.setKey(key);
+
+ insert(entry);
+
+ return entry;
+ }
+ }
+
+ /**
+ * Red Black Tree insert implementation.
+ * @param entry the entry to insert.
+ */
+ private void insert(final SortedQueueEntryImpl entry)
+ {
+ SortedQueueEntryImpl node = _root;
+ if((node = _root) == null)
+ {
+ _root = entry;
+ _head.setNext(entry);
+ entry.setPrev(_head);
+ return;
+ }
+ else
+ {
+ SortedQueueEntryImpl parent = null;
+ while(node != null)
+ {
+ parent = node;
+ if(entry.compareTo(node) < 0)
+ {
+ node = node.getLeft();
+ }
+ else
+ {
+ node = node.getRight();
+ }
+ }
+ entry.setParent(parent);
+
+ if(entry.compareTo(parent) < 0)
+ {
+ parent.setLeft(entry);
+ final SortedQueueEntryImpl prev = parent.getPrev();
+ entry.setNext(parent);
+ prev.setNext(entry);
+ entry.setPrev(prev);
+ parent.setPrev(entry);
+ }
+ else
+ {
+ parent.setRight(entry);
+
+ final SortedQueueEntryImpl next = parent.getNextValidEntry();
+ entry.setNext(next);
+ parent.setNext(entry);
+
+ if(next != null)
+ {
+ next.setPrev(entry);
+ }
+ entry.setPrev(parent);
+ }
+ }
+ entry.setColour(Colour.RED);
+ insertFixup(entry);
+ }
+
+ private void insertFixup(SortedQueueEntryImpl entry)
+ {
+ while(isParentColour(entry, Colour.RED))
+ {
+ final SortedQueueEntryImpl grandparent = nodeGrandparent(entry);
+
+ if(nodeParent(entry) == leftChild(grandparent))
+ {
+ final SortedQueueEntryImpl y = rightChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == rightChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ leftRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ rightRotate(nodeGrandparent(entry));
+ }
+ }
+ else
+ {
+ final SortedQueueEntryImpl y = leftChild(grandparent);
+ if(isNodeColour(y, Colour.RED))
+ {
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(y, Colour.BLACK);
+ setColour(grandparent, Colour.RED);
+ entry = grandparent;
+ }
+ else
+ {
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ entry = nodeParent(entry);
+ rightRotate(entry);
+ }
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(nodeGrandparent(entry), Colour.RED);
+ leftRotate(nodeGrandparent(entry));
+ }
+ }
+ }
+ _root.setColour(Colour.BLACK);
+ }
+
+ private void leftRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl rightChild = rightChild(entry);
+ entry.setRight(rightChild.getLeft());
+ if(entry.getRight() != null)
+ {
+ entry.getRight().setParent(entry);
+ }
+ rightChild.setParent(entry.getParent());
+ if(entry.getParent() == null)
+ {
+ _root = rightChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(rightChild);
+ }
+ else
+ {
+ entry.getParent().setRight(rightChild);
+ }
+ rightChild.setLeft(entry);
+ entry.setParent(rightChild);
+ }
+ }
+
+ private void rightRotate(final SortedQueueEntryImpl entry)
+ {
+ if(entry != null)
+ {
+ final SortedQueueEntryImpl leftChild = leftChild(entry);
+ entry.setLeft(leftChild.getRight());
+ if(entry.getLeft() != null)
+ {
+ leftChild.getRight().setParent(entry);
+ }
+ leftChild.setParent(entry.getParent());
+ if(leftChild.getParent() == null)
+ {
+ _root = leftChild;
+ }
+ else if(entry == entry.getParent().getRight())
+ {
+ entry.getParent().setRight(leftChild);
+ }
+ else
+ {
+ entry.getParent().setLeft(leftChild);
+ }
+ leftChild.setRight(entry);
+ entry.setParent(leftChild);
+ }
+ }
+
+ private void setColour(final SortedQueueEntryImpl node, final Colour colour)
+ {
+ if(node != null)
+ {
+ node.setColour(colour);
+ }
+ }
+
+ private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getLeft();
+ }
+
+ private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getRight();
+ }
+
+ private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node)
+ {
+ return node == null ? null : node.getParent();
+ }
+
+ private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node)
+ {
+ return nodeParent(nodeParent(node));
+ }
+
+ private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+
+ return node != null && isNodeColour(node.getParent(), colour);
+ }
+
+ protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ {
+ return (node == null ? Colour.BLACK : node.getColour()) == colour;
+ }
+
+ @Override
+ public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
+ {
+ synchronized(_lock)
+ {
+ if(node.isDispensed() && _head != node)
+ {
+ SortedQueueEntryImpl current = _head;
+ SortedQueueEntryImpl next;
+ while(current != null)
+ {
+ next = current.getNextValidEntry();
+ if(current.compareTo(node)>0 && !current.isDispensed())
+ {
+ break;
+ }
+ else
+ {
+ current = next;
+ }
+ }
+ return current;
+ }
+ else
+ {
+ return node.getNextValidEntry();
+ }
+ }
+ }
+
+ @Override
+ public QueueEntryIterator<SortedQueueEntryImpl> iterator()
+ {
+ return new QueueEntryIteratorImpl(_head);
+ }
+
+ @Override
+ public SortedQueueEntryImpl getHead()
+ {
+ return _head;
+ }
+
+ protected SortedQueueEntryImpl getRoot()
+ {
+ return _root;
+ }
+
+ @Override
+ public void entryDeleted(final SortedQueueEntryImpl entry)
+ {
+ synchronized(_lock)
+ {
+ // If the node to be removed has two children, we swap the position
+ // of the node and its successor in the tree
+ if(leftChild(entry) != null && rightChild(entry) != null)
+ {
+ swapWithSuccessor(entry);
+ }
+
+ // Then deal with the easy doubly linked list deletion (need to do
+ // this after the swap as the swap uses next
+ final SortedQueueEntryImpl prev = entry.getPrev();
+ if(prev != null)
+ {
+ prev.setNext(entry.getNextValidEntry());
+ }
+
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ if(next != null)
+ {
+ next.setPrev(prev);
+ }
+
+ // now deal with splicing
+ final SortedQueueEntryImpl chosenChild;
+
+ if(leftChild(entry) != null)
+ {
+ chosenChild = leftChild(entry);
+ }
+ else
+ {
+ chosenChild = rightChild(entry);
+ }
+
+ if(chosenChild != null)
+ {
+ // we have one child (x), we can move it up to replace x;
+ chosenChild.setParent(entry.getParent());
+ if(chosenChild.getParent() == null)
+ {
+ _root = chosenChild;
+ }
+ else if(entry == entry.getParent().getLeft())
+ {
+ entry.getParent().setLeft(chosenChild);
+ }
+ else
+ {
+ entry.getParent().setRight(chosenChild);
+ }
+
+ entry.setLeft(null);
+ entry.setRight(null);
+ entry.setParent(null);
+
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(chosenChild);
+ }
+
+ }
+ else
+ {
+ // no children
+ if(entry.getParent() == null)
+ {
+ // no parent either - the tree is empty
+ _root = null;
+ }
+ else
+ {
+ if(entry.getColour() == Colour.BLACK)
+ {
+ deleteFixup(entry);
+ }
+
+ if(entry.getParent() != null)
+ {
+ if(entry.getParent().getLeft() == entry)
+ {
+ entry.getParent().setLeft(null);
+ }
+ else if(entry.getParent().getRight() == entry)
+ {
+ entry.getParent().setRight(null);
+ }
+ entry.setParent(null);
+ }
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Swaps the position of the node in the tree with it's successor
+ * (that is the node with the next highest key)
+ * @param entry
+ */
+ private void swapWithSuccessor(final SortedQueueEntryImpl entry)
+ {
+ final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ final SortedQueueEntryImpl nextParent = next.getParent();
+ final SortedQueueEntryImpl nextLeft = next.getLeft();
+ final SortedQueueEntryImpl nextRight = next.getRight();
+ final Colour nextColour = next.getColour();
+
+ // Special case - the successor is the right child of the node
+ if(next == entry.getRight())
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setRight(entry);
+ entry.setParent(next);
+ next.setLeft(entry.getLeft());
+
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+
+ next.setColour(entry.getColour());
+ entry.setColour(nextColour);
+ entry.setLeft(nextLeft);
+
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ }
+ else
+ {
+ next.setParent(entry.getParent());
+ if(next.getParent() == null)
+ {
+ _root = next;
+ }
+ else if(next.getParent().getLeft() == entry)
+ {
+ next.getParent().setLeft(next);
+ }
+ else
+ {
+ next.getParent().setRight(next);
+ }
+
+ next.setLeft(entry.getLeft());
+ if(next.getLeft() != null)
+ {
+ next.getLeft().setParent(next);
+ }
+ next.setRight(entry.getRight());
+ if(next.getRight() != null)
+ {
+ next.getRight().setParent(next);
+ }
+ next.setColour(entry.getColour());
+
+ entry.setParent(nextParent);
+ if(nextParent.getLeft() == next)
+ {
+ nextParent.setLeft(entry);
+ }
+ else
+ {
+ nextParent.setRight(entry);
+ }
+
+ entry.setLeft(nextLeft);
+ if(nextLeft != null)
+ {
+ nextLeft.setParent(entry);
+ }
+ entry.setRight(nextRight);
+ if(nextRight != null)
+ {
+ nextRight.setParent(entry);
+ }
+ entry.setColour(nextColour);
+ }
+ }
+
+ private void deleteFixup(SortedQueueEntryImpl entry)
+ {
+ int i = 0;
+ while(entry != null && entry != _root
+ && isNodeColour(entry, Colour.BLACK))
+ {
+ i++;
+
+ if(i > 1000)
+ {
+ return;
+ }
+
+ if(entry == leftChild(nodeParent(entry)))
+ {
+ SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry));
+ if(isNodeColour(rightSibling, Colour.RED))
+ {
+ setColour(rightSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ leftRotate(nodeParent(entry));
+ rightSibling = rightChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(rightSibling), Colour.BLACK)
+ && isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(rightSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(rightChild(rightSibling), Colour.BLACK))
+ {
+ setColour(leftChild(rightSibling), Colour.BLACK);
+ rightSibling.setColour(Colour.RED);
+ rightRotate(rightSibling);
+ rightSibling = rightChild(nodeParent(entry));
+ }
+ setColour(rightSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(rightChild(rightSibling), Colour.BLACK);
+ leftRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ else
+ {
+ SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry));
+ if(isNodeColour(leftSibling, Colour.RED))
+ {
+ setColour(leftSibling, Colour.BLACK);
+ nodeParent(entry).setColour(Colour.RED);
+ rightRotate(nodeParent(entry));
+ leftSibling = leftChild(nodeParent(entry));
+ }
+
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK)
+ && isNodeColour(rightChild(leftSibling), Colour.BLACK))
+ {
+ setColour(leftSibling, Colour.RED);
+ entry = nodeParent(entry);
+ }
+ else
+ {
+ if(isNodeColour(leftChild(leftSibling), Colour.BLACK))
+ {
+ setColour(rightChild(leftSibling), Colour.BLACK);
+ leftSibling.setColour(Colour.RED);
+ leftRotate(leftSibling);
+ leftSibling = leftChild(nodeParent(entry));
+ }
+ setColour(leftSibling, getColour(nodeParent(entry)));
+ setColour(nodeParent(entry), Colour.BLACK);
+ setColour(leftChild(leftSibling), Colour.BLACK);
+ rightRotate(nodeParent(entry));
+ entry = _root;
+ }
+ }
+ }
+ setColour(entry, Colour.BLACK);
+ }
+
+ private Colour getColour(final SortedQueueEntryImpl x)
+ {
+ return x == null ? null : x.getColour();
+ }
+
+ public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl>
+ {
+ private SortedQueueEntryImpl _lastNode;
+
+ public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode)
+ {
+ _lastNode = startNode;
+ }
+
+ public boolean atTail()
+ {
+ return next(_lastNode) == null;
+ }
+
+ public SortedQueueEntryImpl getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ if(!atTail())
+ {
+ SortedQueueEntryImpl nextNode = next(_lastNode);
+ while(nextNode.isDispensed() && next(nextNode) != null)
+ {
+ nextNode = next(nextNode);
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
new file mode 100644
index 0000000000..7a70795e77
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
@@ -0,0 +1,19 @@
+package org.apache.qpid.server.queue;
+
+public class SortedQueueEntryListFactory implements QueueEntryListFactory
+{
+
+ private final String _propertyName;
+
+ public SortedQueueEntryListFactory(final String propertyName)
+ {
+ _propertyName = propertyName;
+ }
+
+ @Override
+ public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue)
+ {
+ return new SortedQueueEntryList(queue, _propertyName);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
index 8bb5d02b01..a43be30b85 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.tools.messagestore.commands;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.commons.codec.binary.Hex;
-import org.apache.qpid.server.queue.QueueEntryImpl;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleQueueEntryImpl;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
-import java.io.UnsupportedEncodingException;
-import java.util.LinkedList;
-import java.util.List;
-
public class Dump extends Show
{
private static final int LINE_SIZE = 8;
@@ -259,7 +258,7 @@ public class Dump extends Show
String title, boolean routing, boolean headers, boolean messageHeaders)
{
List<QueueEntry> single = new LinkedList<QueueEntry>();
- single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE));
+ single.add(new SimpleQueueEntryImpl(null,msg, Long.MIN_VALUE));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);