diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
| commit | 26df8c6fd2959210deab55d0c57a2f98ef7d6542 (patch) | |
| tree | 9e378550276888ddd766085bad93994f3f892f17 /qpid/java/broker/src/main | |
| parent | 10a2d3074f254794ae8443994e60b0e4e8c0b56a (diff) | |
| download | qpid-python-26df8c6fd2959210deab55d0c57a2f98ef7d6542.tar.gz | |
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
17 files changed, 1126 insertions, 207 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 4512de6fb4..31c683b548 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -62,7 +62,8 @@ public class QueueConfiguration extends ConfigurationPlugin "capacity", "flowResumeCapacity", "lvq", - "lvqKey" + "lvqKey", + "sortKey" }; } @@ -167,6 +168,10 @@ public class QueueConfiguration extends ConfigurationPlugin return getStringValue("lvqKey", null); } + public String getQueueSortKey() + { + return getStringValue("sortKey", null); + } public static class QueueConfig extends ConfigurationPlugin { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 371ae0de50..2c04a626ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -20,71 +20,25 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.virtualhost.VirtualHost; - import java.util.Map; +import org.apache.qpid.server.virtualhost.VirtualHost; -public class AMQPriorityQueue extends SimpleAMQQueue +public class AMQPriorityQueue extends OutOfOrderQueue { - protected AMQPriorityQueue(final AMQShortString name, - final boolean durable, - final AMQShortString owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - int priorities, Map<String, Object> arguments) - { - super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments); - } - - public AMQPriorityQueue(String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments) + protected AMQPriorityQueue(final String name, + final boolean durable, + final String owner, + final boolean autoDelete, + boolean exclusive, + final VirtualHost virtualHost, + Map<String, Object> arguments, + int priorities) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), - autoDelete, exclusive,virtualHost, priorities, arguments); + super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); } public int getPriorities() { return ((PriorityQueueList) _entries).getPriorities(); } - - @Override - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && entry.isAvailable()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - if(!subscription.isClosed()) - { - QueueContext context = (QueueContext) subscription.getQueueContext(); - if(context != null) - { - QueueEntry subnode = context._lastSeenEntry; - QueueEntry released = context._releasedEntry; - while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0)) - { - if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) - { - break; - } - else - { - subnode = context._lastSeenEntry; - released = context._releasedEntry; - } - } - } - } - - } - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index bee55118ba..e4a6f01930 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,22 +20,22 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.QueueConfiguration; - -import java.util.Map; -import java.util.HashMap; +import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory { - public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; private abstract static class QueueProperty { @@ -157,9 +157,11 @@ public class AMQQueueFactory String description = "Permission denied: queue-name '" + queueName + "'"; throw new AMQSecurityException(description); } - + int priorities = 1; String conflationKey = null; + String sortingKey = null; + if(arguments != null) { if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) @@ -170,24 +172,32 @@ public class AMQQueueFactory conflationKey = QPID_LVQ_KEY; } } - else if(arguments.containsKey(X_QPID_PRIORITIES.toString())) + else if(arguments.containsKey(X_QPID_PRIORITIES)) { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString()); + Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); if(prioritiesObj instanceof Number) { priorities = ((Number)prioritiesObj).intValue(); } } + else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) + { + sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY); + } } AMQQueue q; - if(conflationKey != null) + if(sortingKey != null) + { + q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + } + else if(conflationKey != null) { q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments); + q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); } else { @@ -223,26 +233,22 @@ public class AMQQueueFactory boolean exclusive = config.getExclusive(); String owner = config.getOwner(); Map<String,Object> arguments = null; + if(config.isLVQ() || config.getLVQKey() != null) { - arguments = new HashMap<String,Object>(); arguments.put(QPID_LAST_VALUE_QUEUE, 1); arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); } - else + else if (config.getPriority() || config.getPriorities() > 0) { - boolean priority = config.getPriority(); - int priorities = config.getPriorities(); - if(priority || priorities > 0) - { - arguments = new HashMap<String,Object>(); - if (priorities < 0) - { - priorities = 10; - } - arguments.put("x-qpid-priorities", priorities); - } + arguments = new HashMap<String,Object>(); + arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + } + else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) + { + arguments = new HashMap<String,Object>(); + arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); } AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 2c1883e763..c4762c98c9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -54,7 +54,7 @@ public class ConflationQueueList extends SimpleQueueEntryList @Override - public QueueEntry add(final ServerMessage message) + public ConflationQueueEntry add(final ServerMessage message) { ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); AtomicReference<QueueEntry> latestValueReference = null; @@ -117,7 +117,7 @@ public class ConflationQueueList extends SimpleQueueEntryList } } - private final class ConflationQueueEntry extends QueueEntryImpl + private final class ConflationQueueEntry extends SimpleQueueEntryImpl { @@ -158,7 +158,7 @@ public class ConflationQueueList extends SimpleQueueEntryList _conflationKey = conflationKey; } - public QueueEntryList createQueueEntryList(AMQQueue queue) + public ConflationQueueList createQueueEntryList(AMQQueue queue) { return new ConflationQueueList(queue, _conflationKey); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java new file mode 100644 index 0000000000..b16d1eb8e3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -0,0 +1,53 @@ +package org.apache.qpid.server.queue; + +import java.util.Map; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public abstract class OutOfOrderQueue extends SimpleAMQQueue +{ + protected OutOfOrderQueue(String name, boolean durable, String owner, + boolean autoDelete, boolean exclusive, VirtualHost virtualHost, + QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + { + super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + } + + @Override + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + { + // check that all subscriptions are not in advance of the entry + SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); + while(subIter.advance() && !entry.isAcquired()) + { + final Subscription subscription = subIter.getNode().getSubscription(); + if(!subscription.isClosed()) + { + QueueContext context = (QueueContext) subscription.getQueueContext(); + if(context != null) + { + QueueEntry subnode = context._lastSeenEntry; + QueueEntry released = context._releasedEntry; + + while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() + && (released == null || released.compareTo(entry) > 0)) + { + if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) + { + break; + } + else + { + subnode = context._lastSeenEntry; + released = context._releasedEntry; + } + + } + } + } + + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 0c6b84d2b6..79d3ab5bd0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -20,21 +20,19 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.ServerMessage; -public class PriorityQueueList implements QueueEntryList +public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> { private final AMQQueue _queue; - private final QueueEntryList[] _priorityLists; + private final SimpleQueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; public PriorityQueueList(AMQQueue queue, int priorities) { _queue = queue; - _priorityLists = new QueueEntryList[priorities]; + _priorityLists = new SimpleQueueEntryList[priorities]; _priorities = priorities; _priorityOffset = 5-((priorities + 1)/2); for(int i = 0; i < priorities; i++) @@ -53,7 +51,7 @@ public class PriorityQueueList implements QueueEntryList return _queue; } - public QueueEntry add(ServerMessage message) + public SimpleQueueEntryImpl add(ServerMessage message) { int index = message.getMessageHeader().getPriority() - _priorityOffset; if(index >= _priorities) @@ -68,31 +66,30 @@ public class PriorityQueueList implements QueueEntryList } - public QueueEntry next(QueueEntry node) + public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) { - QueueEntryImpl nodeImpl = (QueueEntryImpl)node; - QueueEntry next = nodeImpl.getNext(); + SimpleQueueEntryImpl next = node.getNextValidEntry(); if(next == null) { - QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); + final QueueEntryList<?> nodeEntryList = node.getQueueEntryList(); int index; for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--); while(next == null && index != 0) { index--; - next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext(); + next = _priorityLists[index].getHead().getNextValidEntry(); } } return next; } - private final class PriorityQueueEntryListIterator implements QueueEntryIterator + private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl> { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; - private QueueEntry _lastNode; + private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ]; + private SimpleQueueEntryImpl _lastNode; PriorityQueueEntryListIterator() { @@ -116,7 +113,7 @@ public class PriorityQueueList implements QueueEntryList return true; } - public QueueEntry getNode() + public SimpleQueueEntryImpl getNode() { return _lastNode; } @@ -135,16 +132,21 @@ public class PriorityQueueList implements QueueEntryList } } - public QueueEntryIterator iterator() + public PriorityQueueEntryListIterator iterator() { return new PriorityQueueEntryListIterator(); } - public QueueEntry getHead() + public SimpleQueueEntryImpl getHead() { return _priorityLists[_priorities-1].getHead(); } + public void entryDeleted(final SimpleQueueEntryImpl queueEntry) + { + + } + static class Factory implements QueueEntryListFactory { private final int _priorities; @@ -154,7 +156,7 @@ public class PriorityQueueList implements QueueEntryList _priorities = priorities; } - public QueueEntryList createQueueEntryList(AMQQueue queue) + public PriorityQueueList createQueueEntryList(AMQQueue queue) { return new PriorityQueueList(queue, _priorities); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index be29245901..c1fb0258fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -214,6 +214,10 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable boolean isQueueDeleted(); + QueueEntry getNextNode(); + + QueueEntry getNextValidEntry(); + void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3d011b99c0..ee1d214c1f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -38,16 +38,11 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -public class QueueEntryImpl implements QueueEntry +public abstract class QueueEntryImpl implements QueueEntry { - - /** - * Used for debugging purposes. - */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - private final SimpleQueueEntryList _queueEntryList; + private final QueueEntryList _queueEntryList; private MessageReference _message; @@ -80,22 +75,20 @@ public class QueueEntryImpl implements QueueEntry private volatile long _entryId; - volatile QueueEntryImpl _next; - private static final int DELIVERED_TO_CONSUMER = 1; private static final int REDELIVERED = 2; private volatile int _deliveryState; - QueueEntryImpl(SimpleQueueEntryList queueEntryList) + public QueueEntryImpl(QueueEntryList<?> queueEntryList) { this(queueEntryList,null,Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; @@ -104,7 +97,7 @@ public class QueueEntryImpl implements QueueEntry _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(); @@ -316,16 +309,15 @@ public class QueueEntryImpl implements QueueEntry public Subscription getDeliveredSubscription() { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } - + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) + { + return ((SubscriptionAcquiredState) state).getSubscription(); + } + else + { + return null; + } } public void reject() @@ -497,33 +489,6 @@ public class QueueEntryImpl implements QueueEntry return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } - public QueueEntryImpl getNext() - { - - QueueEntryImpl next = nextNode(); - while(next != null && next.isDispensed() ) - { - - final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) - { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); - next = nextNode(); - } - else - { - next = null; - } - - } - return next; - } - - QueueEntryImpl nextNode() - { - return _next; - } - public boolean isDeleted() { return _state == DELETED_STATE; @@ -535,7 +500,7 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.entryDeleted(this); return true; } else diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java index c5c115a2d1..73ebb0f300 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryIterator +public interface QueueEntryIterator<QE extends QueueEntry> { boolean atTail(); - QueueEntry getNode(); + QE getNode(); boolean advance(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index b4042ce02c..77c4b912e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -22,15 +22,17 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; -public interface QueueEntryList +public interface QueueEntryList<Q extends QueueEntry> { AMQQueue getQueue(); - QueueEntry add(ServerMessage message); + Q add(ServerMessage message); - QueueEntry next(QueueEntry node); + Q next(Q node); - QueueEntryIterator iterator(); + QueueEntryIterator<Q> iterator(); - QueueEntry getHead(); + Q getHead(); + + void entryDeleted(Q queueEntry); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java new file mode 100644 index 0000000000..0707dc045c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryImpl extends QueueEntryImpl +{ + volatile SimpleQueueEntryImpl _next; + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList) + { + super(queueEntryList); + } + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + { + super(queueEntryList, message, entryId); + } + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + { + super(queueEntryList, message); + } + + public SimpleQueueEntryImpl getNextNode() + { + return _next; + } + + public SimpleQueueEntryImpl getNextValidEntry() + { + + SimpleQueueEntryImpl next = getNextNode(); + while(next != null && next.isDispensed()) + { + + final SimpleQueueEntryImpl newNext = next.getNextNode(); + if(newNext != null) + { + SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + next = getNextNode(); + } + else + { + next = null; + } + + } + return next; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 46baab8c85..0bb5dcc219 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,10 +1,3 @@ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.ServerMessage; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -25,25 +18,31 @@ import java.util.concurrent.atomic.AtomicLong; * under the License. * */ -public class SimpleQueueEntryList implements QueueEntryList +package org.apache.qpid.server.queue; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl> { - private final QueueEntryImpl _head; + private final SimpleQueueEntryImpl _head; - private volatile QueueEntryImpl _tail; + private volatile SimpleQueueEntryImpl _tail; - static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl> + static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl> _tailUpdater = AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail"); + (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail"); private final AMQQueue _queue; - static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl> + static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl> _nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); + (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next"); private AtomicLong _scavenges = new AtomicLong(0L); private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); @@ -52,14 +51,14 @@ public class SimpleQueueEntryList implements QueueEntryList public SimpleQueueEntryList(AMQQueue queue) { _queue = queue; - _head = new QueueEntryImpl(this); + _head = new SimpleQueueEntryImpl(this); _tail = _head; } void advanceHead() { - QueueEntryImpl next = _head.nextNode(); - QueueEntryImpl newNext = _head.getNext(); + SimpleQueueEntryImpl next = _head.getNextNode(); + SimpleQueueEntryImpl newNext = _head.getNextValidEntry(); if (next == newNext) { @@ -73,11 +72,11 @@ public class SimpleQueueEntryList implements QueueEntryList void scavenge() { - QueueEntryImpl next = _head.getNext(); + SimpleQueueEntryImpl next = _head.getNextValidEntry(); while (next != null) { - next = next.getNext(); + next = next.getNextValidEntry(); } } @@ -88,13 +87,13 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntry add(ServerMessage message) + public SimpleQueueEntryImpl add(ServerMessage message) { - QueueEntryImpl node = createQueueEntry(message); + SimpleQueueEntryImpl node = createQueueEntry(message); for (;;) { - QueueEntryImpl tail = _tail; - QueueEntryImpl next = tail.nextNode(); + SimpleQueueEntryImpl tail = _tail; + SimpleQueueEntryImpl next = tail.getNextNode(); if (tail == _tail) { if (next == null) @@ -115,23 +114,22 @@ public class SimpleQueueEntryList implements QueueEntryList } } - protected QueueEntryImpl createQueueEntry(ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message) { - return new QueueEntryImpl(this, message); + return new SimpleQueueEntryImpl(this, message); } - public QueueEntry next(QueueEntry node) + public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) { - return ((QueueEntryImpl)node).getNext(); + return node.getNextValidEntry(); } - - public static class QueueEntryIteratorImpl implements QueueEntryIterator + public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl> { - private QueueEntryImpl _lastNode; + private SimpleQueueEntryImpl _lastNode; - QueueEntryIteratorImpl(QueueEntryImpl startNode) + QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode) { _lastNode = startNode; } @@ -139,14 +137,12 @@ public class SimpleQueueEntryList implements QueueEntryList public boolean atTail() { - return _lastNode.nextNode() == null; + return _lastNode.getNextNode() == null; } - public QueueEntry getNode() + public SimpleQueueEntryImpl getNode() { - return _lastNode; - } public boolean advance() @@ -154,10 +150,10 @@ public class SimpleQueueEntryList implements QueueEntryList if(!atTail()) { - QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDispensed() && nextNode.nextNode() != null) + SimpleQueueEntryImpl nextNode = _lastNode.getNextNode(); + while(nextNode.isDispensed() && nextNode.getNextNode() != null) { - nextNode = nextNode.nextNode(); + nextNode = nextNode.getNextNode(); } _lastNode = nextNode; return true; @@ -173,21 +169,26 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntryIterator iterator() + public QueueEntryIteratorImpl iterator() { return new QueueEntryIteratorImpl(_head); } - public QueueEntry getHead() + public SimpleQueueEntryImpl getHead() { return _head; } + public void entryDeleted(SimpleQueueEntryImpl queueEntry) + { + advanceHead(); + } + static class Factory implements QueueEntryListFactory { - public QueueEntryList createQueueEntryList(AMQQueue queue) + public SimpleQueueEntryList createQueueEntryList(AMQQueue queue) { return new SimpleQueueEntryList(queue); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java new file mode 100644 index 0000000000..3f02442704 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -0,0 +1,30 @@ +package org.apache.qpid.server.queue; + +import java.util.Map; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class SortedQueue extends OutOfOrderQueue +{ + private String _sortedPropertyName; + + protected SortedQueue(final String name, final boolean durable, + final String owner, final boolean autoDelete, final boolean exclusive, + final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) + { + super(name, durable, owner, autoDelete, exclusive, virtualHost, + new SortedQueueEntryListFactory(sortedPropertyName), arguments); + this._sortedPropertyName = sortedPropertyName; + } + + public String getSortedPropertyName() + { + return _sortedPropertyName; + } + + public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + { + super.enqueue(message, action); + } +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java new file mode 100644 index 0000000000..1052adbe67 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +/** + * An implementation of QueueEntryImpl to be used in SortedQueueEntryList. + */ +public class SortedQueueEntryImpl extends QueueEntryImpl +{ + public static enum Colour + { + RED, BLACK + }; + + private volatile SortedQueueEntryImpl _next; + private SortedQueueEntryImpl _prev; + private String _key; + + private Colour _colour = Colour.BLACK; + private SortedQueueEntryImpl _parent; + private SortedQueueEntryImpl _left; + private SortedQueueEntryImpl _right; + + public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList) + { + super(queueEntryList); + } + + public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList, + final ServerMessage message, final long entryId) + { + super(queueEntryList, message, entryId); + } + + @Override + public int compareTo(final QueueEntry o) + { + final String otherKey = ((SortedQueueEntryImpl) o)._key; + final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey); + return compare == 0 ? super.compareTo(o) : compare; + } + + public Colour getColour() + { + return _colour; + } + + public String getKey() + { + return _key; + } + + public SortedQueueEntryImpl getLeft() + { + return _left; + } + + public SortedQueueEntryImpl getNextNode() + { + return _next; + } + + @Override + public SortedQueueEntryImpl getNextValidEntry() + { + return getNextNode(); + } + + public SortedQueueEntryImpl getParent() + { + return _parent; + } + + public SortedQueueEntryImpl getPrev() + { + return _prev; + } + + public SortedQueueEntryImpl getRight() + { + return _right; + } + + public void setColour(final Colour colour) + { + _colour = colour; + } + + public void setKey(final String key) + { + _key = key; + } + + public void setLeft(final SortedQueueEntryImpl left) + { + _left = left; + } + + public void setNext(final SortedQueueEntryImpl next) + { + _next = next; + } + + public void setParent(final SortedQueueEntryImpl parent) + { + _parent = parent; + } + + public void setPrev(final SortedQueueEntryImpl prev) + { + _prev = prev; + } + + public void setRight(final SortedQueueEntryImpl right) + { + _right = right; + } + + @Override + public String toString() + { + return "(" + (_colour == Colour.RED ? "Red," : "Black,") + _key + ")"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java new file mode 100644 index 0000000000..5f8ab16c06 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -0,0 +1,665 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; +import org.apache.qpid.server.store.StoreContext; + +/** + * A sorted implementation of QueueEntryList. + * Uses the red/black tree algorithm specified in "Introduction to Algorithms". + * ISBN-10: 0262033844 + * ISBN-13: 978-0262033848 + * @see http://en.wikipedia.org/wiki/Red-black_tree + */ +public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl> +{ + private final SortedQueueEntryImpl _head; + private SortedQueueEntryImpl _root; + private long _entryId = Long.MIN_VALUE; + private final Object _lock = new Object(); + private final AMQQueue _queue; + private final String _propertyName; + + public SortedQueueEntryList(final AMQQueue queue, final String propertyName) + { + _queue = queue; + _head = new SortedQueueEntryImpl(this); + _propertyName = propertyName; + } + + @Override + public AMQQueue getQueue() + { + return _queue; + } + + @Override + public SortedQueueEntryImpl add(final ServerMessage message) + { + synchronized(_lock) + { + String key = null; + final Object val = message.getMessageHeader().getHeader(_propertyName); + if(val != null) + { + key = val.toString(); + } + + final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId); + entry.setKey(key); + + insert(entry); + + return entry; + } + } + + /** + * Red Black Tree insert implementation. + * @param entry the entry to insert. + */ + private void insert(final SortedQueueEntryImpl entry) + { + SortedQueueEntryImpl node = _root; + if((node = _root) == null) + { + _root = entry; + _head.setNext(entry); + entry.setPrev(_head); + return; + } + else + { + SortedQueueEntryImpl parent = null; + while(node != null) + { + parent = node; + if(entry.compareTo(node) < 0) + { + node = node.getLeft(); + } + else + { + node = node.getRight(); + } + } + entry.setParent(parent); + + if(entry.compareTo(parent) < 0) + { + parent.setLeft(entry); + final SortedQueueEntryImpl prev = parent.getPrev(); + entry.setNext(parent); + prev.setNext(entry); + entry.setPrev(prev); + parent.setPrev(entry); + } + else + { + parent.setRight(entry); + + final SortedQueueEntryImpl next = parent.getNextValidEntry(); + entry.setNext(next); + parent.setNext(entry); + + if(next != null) + { + next.setPrev(entry); + } + entry.setPrev(parent); + } + } + entry.setColour(Colour.RED); + insertFixup(entry); + } + + private void insertFixup(SortedQueueEntryImpl entry) + { + while(isParentColour(entry, Colour.RED)) + { + final SortedQueueEntryImpl grandparent = nodeGrandparent(entry); + + if(nodeParent(entry) == leftChild(grandparent)) + { + final SortedQueueEntryImpl y = rightChild(grandparent); + if(isNodeColour(y, Colour.RED)) + { + setColour(nodeParent(entry), Colour.BLACK); + setColour(y, Colour.BLACK); + setColour(grandparent, Colour.RED); + entry = grandparent; + } + else + { + if(entry == rightChild(nodeParent(entry))) + { + entry = nodeParent(entry); + leftRotate(entry); + } + setColour(nodeParent(entry), Colour.BLACK); + setColour(nodeGrandparent(entry), Colour.RED); + rightRotate(nodeGrandparent(entry)); + } + } + else + { + final SortedQueueEntryImpl y = leftChild(grandparent); + if(isNodeColour(y, Colour.RED)) + { + setColour(nodeParent(entry), Colour.BLACK); + setColour(y, Colour.BLACK); + setColour(grandparent, Colour.RED); + entry = grandparent; + } + else + { + if(entry == leftChild(nodeParent(entry))) + { + entry = nodeParent(entry); + rightRotate(entry); + } + setColour(nodeParent(entry), Colour.BLACK); + setColour(nodeGrandparent(entry), Colour.RED); + leftRotate(nodeGrandparent(entry)); + } + } + } + _root.setColour(Colour.BLACK); + } + + private void leftRotate(final SortedQueueEntryImpl entry) + { + if(entry != null) + { + final SortedQueueEntryImpl rightChild = rightChild(entry); + entry.setRight(rightChild.getLeft()); + if(entry.getRight() != null) + { + entry.getRight().setParent(entry); + } + rightChild.setParent(entry.getParent()); + if(entry.getParent() == null) + { + _root = rightChild; + } + else if(entry == entry.getParent().getLeft()) + { + entry.getParent().setLeft(rightChild); + } + else + { + entry.getParent().setRight(rightChild); + } + rightChild.setLeft(entry); + entry.setParent(rightChild); + } + } + + private void rightRotate(final SortedQueueEntryImpl entry) + { + if(entry != null) + { + final SortedQueueEntryImpl leftChild = leftChild(entry); + entry.setLeft(leftChild.getRight()); + if(entry.getLeft() != null) + { + leftChild.getRight().setParent(entry); + } + leftChild.setParent(entry.getParent()); + if(leftChild.getParent() == null) + { + _root = leftChild; + } + else if(entry == entry.getParent().getRight()) + { + entry.getParent().setRight(leftChild); + } + else + { + entry.getParent().setLeft(leftChild); + } + leftChild.setRight(entry); + entry.setParent(leftChild); + } + } + + private void setColour(final SortedQueueEntryImpl node, final Colour colour) + { + if(node != null) + { + node.setColour(colour); + } + } + + private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getLeft(); + } + + private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getRight(); + } + + private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getParent(); + } + + private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node) + { + return nodeParent(nodeParent(node)); + } + + private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + { + + return node != null && isNodeColour(node.getParent(), colour); + } + + protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + { + return (node == null ? Colour.BLACK : node.getColour()) == colour; + } + + @Override + public SortedQueueEntryImpl next(final SortedQueueEntryImpl node) + { + synchronized(_lock) + { + if(node.isDispensed() && _head != node) + { + SortedQueueEntryImpl current = _head; + SortedQueueEntryImpl next; + while(current != null) + { + next = current.getNextValidEntry(); + if(current.compareTo(node)>0 && !current.isDispensed()) + { + break; + } + else + { + current = next; + } + } + return current; + } + else + { + return node.getNextValidEntry(); + } + } + } + + @Override + public QueueEntryIterator<SortedQueueEntryImpl> iterator() + { + return new QueueEntryIteratorImpl(_head); + } + + @Override + public SortedQueueEntryImpl getHead() + { + return _head; + } + + protected SortedQueueEntryImpl getRoot() + { + return _root; + } + + @Override + public void entryDeleted(final SortedQueueEntryImpl entry) + { + synchronized(_lock) + { + // If the node to be removed has two children, we swap the position + // of the node and its successor in the tree + if(leftChild(entry) != null && rightChild(entry) != null) + { + swapWithSuccessor(entry); + } + + // Then deal with the easy doubly linked list deletion (need to do + // this after the swap as the swap uses next + final SortedQueueEntryImpl prev = entry.getPrev(); + if(prev != null) + { + prev.setNext(entry.getNextValidEntry()); + } + + final SortedQueueEntryImpl next = entry.getNextValidEntry(); + if(next != null) + { + next.setPrev(prev); + } + + // now deal with splicing + final SortedQueueEntryImpl chosenChild; + + if(leftChild(entry) != null) + { + chosenChild = leftChild(entry); + } + else + { + chosenChild = rightChild(entry); + } + + if(chosenChild != null) + { + // we have one child (x), we can move it up to replace x; + chosenChild.setParent(entry.getParent()); + if(chosenChild.getParent() == null) + { + _root = chosenChild; + } + else if(entry == entry.getParent().getLeft()) + { + entry.getParent().setLeft(chosenChild); + } + else + { + entry.getParent().setRight(chosenChild); + } + + entry.setLeft(null); + entry.setRight(null); + entry.setParent(null); + + if(entry.getColour() == Colour.BLACK) + { + deleteFixup(chosenChild); + } + + } + else + { + // no children + if(entry.getParent() == null) + { + // no parent either - the tree is empty + _root = null; + } + else + { + if(entry.getColour() == Colour.BLACK) + { + deleteFixup(entry); + } + + if(entry.getParent() != null) + { + if(entry.getParent().getLeft() == entry) + { + entry.getParent().setLeft(null); + } + else if(entry.getParent().getRight() == entry) + { + entry.getParent().setRight(null); + } + entry.setParent(null); + } + } + } + + } + } + + /** + * Swaps the position of the node in the tree with it's successor + * (that is the node with the next highest key) + * @param entry + */ + private void swapWithSuccessor(final SortedQueueEntryImpl entry) + { + final SortedQueueEntryImpl next = entry.getNextValidEntry(); + final SortedQueueEntryImpl nextParent = next.getParent(); + final SortedQueueEntryImpl nextLeft = next.getLeft(); + final SortedQueueEntryImpl nextRight = next.getRight(); + final Colour nextColour = next.getColour(); + + // Special case - the successor is the right child of the node + if(next == entry.getRight()) + { + next.setParent(entry.getParent()); + if(next.getParent() == null) + { + _root = next; + } + else if(next.getParent().getLeft() == entry) + { + next.getParent().setLeft(next); + } + else + { + next.getParent().setRight(next); + } + + next.setRight(entry); + entry.setParent(next); + next.setLeft(entry.getLeft()); + + if(next.getLeft() != null) + { + next.getLeft().setParent(next); + } + + next.setColour(entry.getColour()); + entry.setColour(nextColour); + entry.setLeft(nextLeft); + + if(nextLeft != null) + { + nextLeft.setParent(entry); + } + entry.setRight(nextRight); + if(nextRight != null) + { + nextRight.setParent(entry); + } + } + else + { + next.setParent(entry.getParent()); + if(next.getParent() == null) + { + _root = next; + } + else if(next.getParent().getLeft() == entry) + { + next.getParent().setLeft(next); + } + else + { + next.getParent().setRight(next); + } + + next.setLeft(entry.getLeft()); + if(next.getLeft() != null) + { + next.getLeft().setParent(next); + } + next.setRight(entry.getRight()); + if(next.getRight() != null) + { + next.getRight().setParent(next); + } + next.setColour(entry.getColour()); + + entry.setParent(nextParent); + if(nextParent.getLeft() == next) + { + nextParent.setLeft(entry); + } + else + { + nextParent.setRight(entry); + } + + entry.setLeft(nextLeft); + if(nextLeft != null) + { + nextLeft.setParent(entry); + } + entry.setRight(nextRight); + if(nextRight != null) + { + nextRight.setParent(entry); + } + entry.setColour(nextColour); + } + } + + private void deleteFixup(SortedQueueEntryImpl entry) + { + int i = 0; + while(entry != null && entry != _root + && isNodeColour(entry, Colour.BLACK)) + { + i++; + + if(i > 1000) + { + return; + } + + if(entry == leftChild(nodeParent(entry))) + { + SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry)); + if(isNodeColour(rightSibling, Colour.RED)) + { + setColour(rightSibling, Colour.BLACK); + nodeParent(entry).setColour(Colour.RED); + leftRotate(nodeParent(entry)); + rightSibling = rightChild(nodeParent(entry)); + } + + if(isNodeColour(leftChild(rightSibling), Colour.BLACK) + && isNodeColour(rightChild(rightSibling), Colour.BLACK)) + { + setColour(rightSibling, Colour.RED); + entry = nodeParent(entry); + } + else + { + if(isNodeColour(rightChild(rightSibling), Colour.BLACK)) + { + setColour(leftChild(rightSibling), Colour.BLACK); + rightSibling.setColour(Colour.RED); + rightRotate(rightSibling); + rightSibling = rightChild(nodeParent(entry)); + } + setColour(rightSibling, getColour(nodeParent(entry))); + setColour(nodeParent(entry), Colour.BLACK); + setColour(rightChild(rightSibling), Colour.BLACK); + leftRotate(nodeParent(entry)); + entry = _root; + } + } + else + { + SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry)); + if(isNodeColour(leftSibling, Colour.RED)) + { + setColour(leftSibling, Colour.BLACK); + nodeParent(entry).setColour(Colour.RED); + rightRotate(nodeParent(entry)); + leftSibling = leftChild(nodeParent(entry)); + } + + if(isNodeColour(leftChild(leftSibling), Colour.BLACK) + && isNodeColour(rightChild(leftSibling), Colour.BLACK)) + { + setColour(leftSibling, Colour.RED); + entry = nodeParent(entry); + } + else + { + if(isNodeColour(leftChild(leftSibling), Colour.BLACK)) + { + setColour(rightChild(leftSibling), Colour.BLACK); + leftSibling.setColour(Colour.RED); + leftRotate(leftSibling); + leftSibling = leftChild(nodeParent(entry)); + } + setColour(leftSibling, getColour(nodeParent(entry))); + setColour(nodeParent(entry), Colour.BLACK); + setColour(leftChild(leftSibling), Colour.BLACK); + rightRotate(nodeParent(entry)); + entry = _root; + } + } + } + setColour(entry, Colour.BLACK); + } + + private Colour getColour(final SortedQueueEntryImpl x) + { + return x == null ? null : x.getColour(); + } + + public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl> + { + private SortedQueueEntryImpl _lastNode; + + public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode) + { + _lastNode = startNode; + } + + public boolean atTail() + { + return next(_lastNode) == null; + } + + public SortedQueueEntryImpl getNode() + { + return _lastNode; + } + + public boolean advance() + { + if(!atTail()) + { + SortedQueueEntryImpl nextNode = next(_lastNode); + while(nextNode.isDispensed() && next(nextNode) != null) + { + nextNode = next(nextNode); + } + _lastNode = nextNode; + return true; + + } + else + { + return false; + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java new file mode 100644 index 0000000000..7a70795e77 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -0,0 +1,19 @@ +package org.apache.qpid.server.queue; + +public class SortedQueueEntryListFactory implements QueueEntryListFactory +{ + + private final String _propertyName; + + public SortedQueueEntryListFactory(final String propertyName) + { + _propertyName = propertyName; + } + + @Override + public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue) + { + return new SortedQueueEntryList(queue, _propertyName); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java index 8bb5d02b01..a43be30b85 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -20,17 +20,16 @@ */ package org.apache.qpid.tools.messagestore.commands; +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.List; import org.apache.commons.codec.binary.Hex; -import org.apache.qpid.server.queue.QueueEntryImpl; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.SimpleQueueEntryImpl; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; -import java.io.UnsupportedEncodingException; -import java.util.LinkedList; -import java.util.List; - public class Dump extends Show { private static final int LINE_SIZE = 8; @@ -259,7 +258,7 @@ public class Dump extends Show String title, boolean routing, boolean headers, boolean messageHeaders) { List<QueueEntry> single = new LinkedList<QueueEntry>(); - single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE)); + single.add(new SimpleQueueEntryImpl(null,msg, Long.MIN_VALUE)); List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); |
