diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-20 23:37:48 +0000 |
| commit | a0fccd9dbfabd912a906225f817cd00072d7fc8d (patch) | |
| tree | 71d457039d047e12ee5bec2f6cc98f457f050bec /java | |
| parent | 33db3cab8c1ec8d82045e557b190a98a2418c565 (diff) | |
| download | qpid-python-a0fccd9dbfabd912a906225f817cd00072d7fc8d.tar.gz | |
QPID-3622: Add Sorted Queue funtionality
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1204295 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
36 files changed, 2749 insertions, 517 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 4512de6fb4..31c683b548 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -62,7 +62,8 @@ public class QueueConfiguration extends ConfigurationPlugin "capacity", "flowResumeCapacity", "lvq", - "lvqKey" + "lvqKey", + "sortKey" }; } @@ -167,6 +168,10 @@ public class QueueConfiguration extends ConfigurationPlugin return getStringValue("lvqKey", null); } + public String getQueueSortKey() + { + return getStringValue("sortKey", null); + } public static class QueueConfig extends ConfigurationPlugin { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 371ae0de50..2c04a626ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -20,71 +20,25 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.virtualhost.VirtualHost; - import java.util.Map; +import org.apache.qpid.server.virtualhost.VirtualHost; -public class AMQPriorityQueue extends SimpleAMQQueue +public class AMQPriorityQueue extends OutOfOrderQueue { - protected AMQPriorityQueue(final AMQShortString name, - final boolean durable, - final AMQShortString owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - int priorities, Map<String, Object> arguments) - { - super(name, durable, owner, autoDelete, exclusive, virtualHost,new PriorityQueueList.Factory(priorities), arguments); - } - - public AMQPriorityQueue(String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, VirtualHost virtualHost, int priorities, Map<String,Object> arguments) + protected AMQPriorityQueue(final String name, + final boolean durable, + final String owner, + final boolean autoDelete, + boolean exclusive, + final VirtualHost virtualHost, + Map<String, Object> arguments, + int priorities) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), - autoDelete, exclusive,virtualHost, priorities, arguments); + super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); } public int getPriorities() { return ((PriorityQueueList) _entries).getPriorities(); } - - @Override - protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) - { - // check that all subscriptions are not in advance of the entry - SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); - while(subIter.advance() && entry.isAvailable()) - { - final Subscription subscription = subIter.getNode().getSubscription(); - if(!subscription.isClosed()) - { - QueueContext context = (QueueContext) subscription.getQueueContext(); - if(context != null) - { - QueueEntry subnode = context._lastSeenEntry; - QueueEntry released = context._releasedEntry; - while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0)) - { - if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) - { - break; - } - else - { - subnode = context._lastSeenEntry; - released = context._releasedEntry; - } - } - } - } - - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index bee55118ba..e4a6f01930 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,22 +20,22 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.configuration.QueueConfiguration; - -import java.util.Map; -import java.util.HashMap; +import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory { - public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; private abstract static class QueueProperty { @@ -157,9 +157,11 @@ public class AMQQueueFactory String description = "Permission denied: queue-name '" + queueName + "'"; throw new AMQSecurityException(description); } - + int priorities = 1; String conflationKey = null; + String sortingKey = null; + if(arguments != null) { if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) @@ -170,24 +172,32 @@ public class AMQQueueFactory conflationKey = QPID_LVQ_KEY; } } - else if(arguments.containsKey(X_QPID_PRIORITIES.toString())) + else if(arguments.containsKey(X_QPID_PRIORITIES)) { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES.toString()); + Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); if(prioritiesObj instanceof Number) { priorities = ((Number)prioritiesObj).intValue(); } } + else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) + { + sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY); + } } AMQQueue q; - if(conflationKey != null) + if(sortingKey != null) + { + q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + } + else if(conflationKey != null) { q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, priorities, arguments); + q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); } else { @@ -223,26 +233,22 @@ public class AMQQueueFactory boolean exclusive = config.getExclusive(); String owner = config.getOwner(); Map<String,Object> arguments = null; + if(config.isLVQ() || config.getLVQKey() != null) { - arguments = new HashMap<String,Object>(); arguments.put(QPID_LAST_VALUE_QUEUE, 1); arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); } - else + else if (config.getPriority() || config.getPriorities() > 0) { - boolean priority = config.getPriority(); - int priorities = config.getPriorities(); - if(priority || priorities > 0) - { - arguments = new HashMap<String,Object>(); - if (priorities < 0) - { - priorities = 10; - } - arguments.put("x-qpid-priorities", priorities); - } + arguments = new HashMap<String,Object>(); + arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + } + else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) + { + arguments = new HashMap<String,Object>(); + arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); } AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 2c1883e763..c4762c98c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -54,7 +54,7 @@ public class ConflationQueueList extends SimpleQueueEntryList @Override - public QueueEntry add(final ServerMessage message) + public ConflationQueueEntry add(final ServerMessage message) { ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); AtomicReference<QueueEntry> latestValueReference = null; @@ -117,7 +117,7 @@ public class ConflationQueueList extends SimpleQueueEntryList } } - private final class ConflationQueueEntry extends QueueEntryImpl + private final class ConflationQueueEntry extends SimpleQueueEntryImpl { @@ -158,7 +158,7 @@ public class ConflationQueueList extends SimpleQueueEntryList _conflationKey = conflationKey; } - public QueueEntryList createQueueEntryList(AMQQueue queue) + public ConflationQueueList createQueueEntryList(AMQQueue queue) { return new ConflationQueueList(queue, _conflationKey); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java new file mode 100644 index 0000000000..b16d1eb8e3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -0,0 +1,53 @@ +package org.apache.qpid.server.queue; + +import java.util.Map; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public abstract class OutOfOrderQueue extends SimpleAMQQueue +{ + protected OutOfOrderQueue(String name, boolean durable, String owner, + boolean autoDelete, boolean exclusive, VirtualHost virtualHost, + QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + { + super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + } + + @Override + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) + { + // check that all subscriptions are not in advance of the entry + SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator(); + while(subIter.advance() && !entry.isAcquired()) + { + final Subscription subscription = subIter.getNode().getSubscription(); + if(!subscription.isClosed()) + { + QueueContext context = (QueueContext) subscription.getQueueContext(); + if(context != null) + { + QueueEntry subnode = context._lastSeenEntry; + QueueEntry released = context._releasedEntry; + + while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() + && (released == null || released.compareTo(entry) > 0)) + { + if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) + { + break; + } + else + { + subnode = context._lastSeenEntry; + released = context._releasedEntry; + } + + } + } + } + + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 0c6b84d2b6..79d3ab5bd0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -20,21 +20,19 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.ServerMessage; -public class PriorityQueueList implements QueueEntryList +public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> { private final AMQQueue _queue; - private final QueueEntryList[] _priorityLists; + private final SimpleQueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; public PriorityQueueList(AMQQueue queue, int priorities) { _queue = queue; - _priorityLists = new QueueEntryList[priorities]; + _priorityLists = new SimpleQueueEntryList[priorities]; _priorities = priorities; _priorityOffset = 5-((priorities + 1)/2); for(int i = 0; i < priorities; i++) @@ -53,7 +51,7 @@ public class PriorityQueueList implements QueueEntryList return _queue; } - public QueueEntry add(ServerMessage message) + public SimpleQueueEntryImpl add(ServerMessage message) { int index = message.getMessageHeader().getPriority() - _priorityOffset; if(index >= _priorities) @@ -68,31 +66,30 @@ public class PriorityQueueList implements QueueEntryList } - public QueueEntry next(QueueEntry node) + public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) { - QueueEntryImpl nodeImpl = (QueueEntryImpl)node; - QueueEntry next = nodeImpl.getNext(); + SimpleQueueEntryImpl next = node.getNextValidEntry(); if(next == null) { - QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); + final QueueEntryList<?> nodeEntryList = node.getQueueEntryList(); int index; for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--); while(next == null && index != 0) { index--; - next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext(); + next = _priorityLists[index].getHead().getNextValidEntry(); } } return next; } - private final class PriorityQueueEntryListIterator implements QueueEntryIterator + private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl> { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; - private QueueEntry _lastNode; + private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ]; + private SimpleQueueEntryImpl _lastNode; PriorityQueueEntryListIterator() { @@ -116,7 +113,7 @@ public class PriorityQueueList implements QueueEntryList return true; } - public QueueEntry getNode() + public SimpleQueueEntryImpl getNode() { return _lastNode; } @@ -135,16 +132,21 @@ public class PriorityQueueList implements QueueEntryList } } - public QueueEntryIterator iterator() + public PriorityQueueEntryListIterator iterator() { return new PriorityQueueEntryListIterator(); } - public QueueEntry getHead() + public SimpleQueueEntryImpl getHead() { return _priorityLists[_priorities-1].getHead(); } + public void entryDeleted(final SimpleQueueEntryImpl queueEntry) + { + + } + static class Factory implements QueueEntryListFactory { private final int _priorities; @@ -154,7 +156,7 @@ public class PriorityQueueList implements QueueEntryList _priorities = priorities; } - public QueueEntryList createQueueEntryList(AMQQueue queue) + public PriorityQueueList createQueueEntryList(AMQQueue queue) { return new PriorityQueueList(queue, _priorities); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index be29245901..c1fb0258fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -214,6 +214,10 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable boolean isQueueDeleted(); + QueueEntry getNextNode(); + + QueueEntry getNextValidEntry(); + void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3d011b99c0..ee1d214c1f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -38,16 +38,11 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -public class QueueEntryImpl implements QueueEntry +public abstract class QueueEntryImpl implements QueueEntry { - - /** - * Used for debugging purposes. - */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - private final SimpleQueueEntryList _queueEntryList; + private final QueueEntryList _queueEntryList; private MessageReference _message; @@ -80,22 +75,20 @@ public class QueueEntryImpl implements QueueEntry private volatile long _entryId; - volatile QueueEntryImpl _next; - private static final int DELIVERED_TO_CONSUMER = 1; private static final int REDELIVERED = 2; private volatile int _deliveryState; - QueueEntryImpl(SimpleQueueEntryList queueEntryList) + public QueueEntryImpl(QueueEntryList<?> queueEntryList) { this(queueEntryList,null,Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; @@ -104,7 +97,7 @@ public class QueueEntryImpl implements QueueEntry _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(); @@ -316,16 +309,15 @@ public class QueueEntryImpl implements QueueEntry public Subscription getDeliveredSubscription() { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } - + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) + { + return ((SubscriptionAcquiredState) state).getSubscription(); + } + else + { + return null; + } } public void reject() @@ -497,33 +489,6 @@ public class QueueEntryImpl implements QueueEntry return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } - public QueueEntryImpl getNext() - { - - QueueEntryImpl next = nextNode(); - while(next != null && next.isDispensed() ) - { - - final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) - { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); - next = nextNode(); - } - else - { - next = null; - } - - } - return next; - } - - QueueEntryImpl nextNode() - { - return _next; - } - public boolean isDeleted() { return _state == DELETED_STATE; @@ -535,7 +500,7 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.entryDeleted(this); return true; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java index c5c115a2d1..73ebb0f300 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryIterator +public interface QueueEntryIterator<QE extends QueueEntry> { boolean atTail(); - QueueEntry getNode(); + QE getNode(); boolean advance(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index b4042ce02c..77c4b912e0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -22,15 +22,17 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; -public interface QueueEntryList +public interface QueueEntryList<Q extends QueueEntry> { AMQQueue getQueue(); - QueueEntry add(ServerMessage message); + Q add(ServerMessage message); - QueueEntry next(QueueEntry node); + Q next(Q node); - QueueEntryIterator iterator(); + QueueEntryIterator<Q> iterator(); - QueueEntry getHead(); + Q getHead(); + + void entryDeleted(Q queueEntry); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java new file mode 100644 index 0000000000..0707dc045c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryImpl extends QueueEntryImpl +{ + volatile SimpleQueueEntryImpl _next; + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList) + { + super(queueEntryList); + } + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + { + super(queueEntryList, message, entryId); + } + + public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + { + super(queueEntryList, message); + } + + public SimpleQueueEntryImpl getNextNode() + { + return _next; + } + + public SimpleQueueEntryImpl getNextValidEntry() + { + + SimpleQueueEntryImpl next = getNextNode(); + while(next != null && next.isDispensed()) + { + + final SimpleQueueEntryImpl newNext = next.getNextNode(); + if(newNext != null) + { + SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + next = getNextNode(); + } + else + { + next = null; + } + + } + return next; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 46baab8c85..0bb5dcc219 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,10 +1,3 @@ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.ServerMessage; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicLong; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -25,25 +18,31 @@ import java.util.concurrent.atomic.AtomicLong; * under the License. * */ -public class SimpleQueueEntryList implements QueueEntryList +package org.apache.qpid.server.queue; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl> { - private final QueueEntryImpl _head; + private final SimpleQueueEntryImpl _head; - private volatile QueueEntryImpl _tail; + private volatile SimpleQueueEntryImpl _tail; - static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, QueueEntryImpl> + static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl> _tailUpdater = AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryList.class, QueueEntryImpl.class, "_tail"); + (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail"); private final AMQQueue _queue; - static final AtomicReferenceFieldUpdater<QueueEntryImpl, QueueEntryImpl> + static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl> _nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); + (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next"); private AtomicLong _scavenges = new AtomicLong(0L); private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); @@ -52,14 +51,14 @@ public class SimpleQueueEntryList implements QueueEntryList public SimpleQueueEntryList(AMQQueue queue) { _queue = queue; - _head = new QueueEntryImpl(this); + _head = new SimpleQueueEntryImpl(this); _tail = _head; } void advanceHead() { - QueueEntryImpl next = _head.nextNode(); - QueueEntryImpl newNext = _head.getNext(); + SimpleQueueEntryImpl next = _head.getNextNode(); + SimpleQueueEntryImpl newNext = _head.getNextValidEntry(); if (next == newNext) { @@ -73,11 +72,11 @@ public class SimpleQueueEntryList implements QueueEntryList void scavenge() { - QueueEntryImpl next = _head.getNext(); + SimpleQueueEntryImpl next = _head.getNextValidEntry(); while (next != null) { - next = next.getNext(); + next = next.getNextValidEntry(); } } @@ -88,13 +87,13 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntry add(ServerMessage message) + public SimpleQueueEntryImpl add(ServerMessage message) { - QueueEntryImpl node = createQueueEntry(message); + SimpleQueueEntryImpl node = createQueueEntry(message); for (;;) { - QueueEntryImpl tail = _tail; - QueueEntryImpl next = tail.nextNode(); + SimpleQueueEntryImpl tail = _tail; + SimpleQueueEntryImpl next = tail.getNextNode(); if (tail == _tail) { if (next == null) @@ -115,23 +114,22 @@ public class SimpleQueueEntryList implements QueueEntryList } } - protected QueueEntryImpl createQueueEntry(ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message) { - return new QueueEntryImpl(this, message); + return new SimpleQueueEntryImpl(this, message); } - public QueueEntry next(QueueEntry node) + public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) { - return ((QueueEntryImpl)node).getNext(); + return node.getNextValidEntry(); } - - public static class QueueEntryIteratorImpl implements QueueEntryIterator + public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl> { - private QueueEntryImpl _lastNode; + private SimpleQueueEntryImpl _lastNode; - QueueEntryIteratorImpl(QueueEntryImpl startNode) + QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode) { _lastNode = startNode; } @@ -139,14 +137,12 @@ public class SimpleQueueEntryList implements QueueEntryList public boolean atTail() { - return _lastNode.nextNode() == null; + return _lastNode.getNextNode() == null; } - public QueueEntry getNode() + public SimpleQueueEntryImpl getNode() { - return _lastNode; - } public boolean advance() @@ -154,10 +150,10 @@ public class SimpleQueueEntryList implements QueueEntryList if(!atTail()) { - QueueEntryImpl nextNode = _lastNode.nextNode(); - while(nextNode.isDispensed() && nextNode.nextNode() != null) + SimpleQueueEntryImpl nextNode = _lastNode.getNextNode(); + while(nextNode.isDispensed() && nextNode.getNextNode() != null) { - nextNode = nextNode.nextNode(); + nextNode = nextNode.getNextNode(); } _lastNode = nextNode; return true; @@ -173,21 +169,26 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntryIterator iterator() + public QueueEntryIteratorImpl iterator() { return new QueueEntryIteratorImpl(_head); } - public QueueEntry getHead() + public SimpleQueueEntryImpl getHead() { return _head; } + public void entryDeleted(SimpleQueueEntryImpl queueEntry) + { + advanceHead(); + } + static class Factory implements QueueEntryListFactory { - public QueueEntryList createQueueEntryList(AMQQueue queue) + public SimpleQueueEntryList createQueueEntryList(AMQQueue queue) { return new SimpleQueueEntryList(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java new file mode 100644 index 0000000000..3f02442704 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -0,0 +1,30 @@ +package org.apache.qpid.server.queue; + +import java.util.Map; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class SortedQueue extends OutOfOrderQueue +{ + private String _sortedPropertyName; + + protected SortedQueue(final String name, final boolean durable, + final String owner, final boolean autoDelete, final boolean exclusive, + final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) + { + super(name, durable, owner, autoDelete, exclusive, virtualHost, + new SortedQueueEntryListFactory(sortedPropertyName), arguments); + this._sortedPropertyName = sortedPropertyName; + } + + public String getSortedPropertyName() + { + return _sortedPropertyName; + } + + public synchronized void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + { + super.enqueue(message, action); + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java new file mode 100644 index 0000000000..1052adbe67 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +/** + * An implementation of QueueEntryImpl to be used in SortedQueueEntryList. + */ +public class SortedQueueEntryImpl extends QueueEntryImpl +{ + public static enum Colour + { + RED, BLACK + }; + + private volatile SortedQueueEntryImpl _next; + private SortedQueueEntryImpl _prev; + private String _key; + + private Colour _colour = Colour.BLACK; + private SortedQueueEntryImpl _parent; + private SortedQueueEntryImpl _left; + private SortedQueueEntryImpl _right; + + public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList) + { + super(queueEntryList); + } + + public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList, + final ServerMessage message, final long entryId) + { + super(queueEntryList, message, entryId); + } + + @Override + public int compareTo(final QueueEntry o) + { + final String otherKey = ((SortedQueueEntryImpl) o)._key; + final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey); + return compare == 0 ? super.compareTo(o) : compare; + } + + public Colour getColour() + { + return _colour; + } + + public String getKey() + { + return _key; + } + + public SortedQueueEntryImpl getLeft() + { + return _left; + } + + public SortedQueueEntryImpl getNextNode() + { + return _next; + } + + @Override + public SortedQueueEntryImpl getNextValidEntry() + { + return getNextNode(); + } + + public SortedQueueEntryImpl getParent() + { + return _parent; + } + + public SortedQueueEntryImpl getPrev() + { + return _prev; + } + + public SortedQueueEntryImpl getRight() + { + return _right; + } + + public void setColour(final Colour colour) + { + _colour = colour; + } + + public void setKey(final String key) + { + _key = key; + } + + public void setLeft(final SortedQueueEntryImpl left) + { + _left = left; + } + + public void setNext(final SortedQueueEntryImpl next) + { + _next = next; + } + + public void setParent(final SortedQueueEntryImpl parent) + { + _parent = parent; + } + + public void setPrev(final SortedQueueEntryImpl prev) + { + _prev = prev; + } + + public void setRight(final SortedQueueEntryImpl right) + { + _right = right; + } + + @Override + public String toString() + { + return "(" + (_colour == Colour.RED ? "Red," : "Black,") + _key + ")"; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java new file mode 100644 index 0000000000..5f8ab16c06 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -0,0 +1,665 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; +import org.apache.qpid.server.store.StoreContext; + +/** + * A sorted implementation of QueueEntryList. + * Uses the red/black tree algorithm specified in "Introduction to Algorithms". + * ISBN-10: 0262033844 + * ISBN-13: 978-0262033848 + * @see http://en.wikipedia.org/wiki/Red-black_tree + */ +public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl> +{ + private final SortedQueueEntryImpl _head; + private SortedQueueEntryImpl _root; + private long _entryId = Long.MIN_VALUE; + private final Object _lock = new Object(); + private final AMQQueue _queue; + private final String _propertyName; + + public SortedQueueEntryList(final AMQQueue queue, final String propertyName) + { + _queue = queue; + _head = new SortedQueueEntryImpl(this); + _propertyName = propertyName; + } + + @Override + public AMQQueue getQueue() + { + return _queue; + } + + @Override + public SortedQueueEntryImpl add(final ServerMessage message) + { + synchronized(_lock) + { + String key = null; + final Object val = message.getMessageHeader().getHeader(_propertyName); + if(val != null) + { + key = val.toString(); + } + + final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId); + entry.setKey(key); + + insert(entry); + + return entry; + } + } + + /** + * Red Black Tree insert implementation. + * @param entry the entry to insert. + */ + private void insert(final SortedQueueEntryImpl entry) + { + SortedQueueEntryImpl node = _root; + if((node = _root) == null) + { + _root = entry; + _head.setNext(entry); + entry.setPrev(_head); + return; + } + else + { + SortedQueueEntryImpl parent = null; + while(node != null) + { + parent = node; + if(entry.compareTo(node) < 0) + { + node = node.getLeft(); + } + else + { + node = node.getRight(); + } + } + entry.setParent(parent); + + if(entry.compareTo(parent) < 0) + { + parent.setLeft(entry); + final SortedQueueEntryImpl prev = parent.getPrev(); + entry.setNext(parent); + prev.setNext(entry); + entry.setPrev(prev); + parent.setPrev(entry); + } + else + { + parent.setRight(entry); + + final SortedQueueEntryImpl next = parent.getNextValidEntry(); + entry.setNext(next); + parent.setNext(entry); + + if(next != null) + { + next.setPrev(entry); + } + entry.setPrev(parent); + } + } + entry.setColour(Colour.RED); + insertFixup(entry); + } + + private void insertFixup(SortedQueueEntryImpl entry) + { + while(isParentColour(entry, Colour.RED)) + { + final SortedQueueEntryImpl grandparent = nodeGrandparent(entry); + + if(nodeParent(entry) == leftChild(grandparent)) + { + final SortedQueueEntryImpl y = rightChild(grandparent); + if(isNodeColour(y, Colour.RED)) + { + setColour(nodeParent(entry), Colour.BLACK); + setColour(y, Colour.BLACK); + setColour(grandparent, Colour.RED); + entry = grandparent; + } + else + { + if(entry == rightChild(nodeParent(entry))) + { + entry = nodeParent(entry); + leftRotate(entry); + } + setColour(nodeParent(entry), Colour.BLACK); + setColour(nodeGrandparent(entry), Colour.RED); + rightRotate(nodeGrandparent(entry)); + } + } + else + { + final SortedQueueEntryImpl y = leftChild(grandparent); + if(isNodeColour(y, Colour.RED)) + { + setColour(nodeParent(entry), Colour.BLACK); + setColour(y, Colour.BLACK); + setColour(grandparent, Colour.RED); + entry = grandparent; + } + else + { + if(entry == leftChild(nodeParent(entry))) + { + entry = nodeParent(entry); + rightRotate(entry); + } + setColour(nodeParent(entry), Colour.BLACK); + setColour(nodeGrandparent(entry), Colour.RED); + leftRotate(nodeGrandparent(entry)); + } + } + } + _root.setColour(Colour.BLACK); + } + + private void leftRotate(final SortedQueueEntryImpl entry) + { + if(entry != null) + { + final SortedQueueEntryImpl rightChild = rightChild(entry); + entry.setRight(rightChild.getLeft()); + if(entry.getRight() != null) + { + entry.getRight().setParent(entry); + } + rightChild.setParent(entry.getParent()); + if(entry.getParent() == null) + { + _root = rightChild; + } + else if(entry == entry.getParent().getLeft()) + { + entry.getParent().setLeft(rightChild); + } + else + { + entry.getParent().setRight(rightChild); + } + rightChild.setLeft(entry); + entry.setParent(rightChild); + } + } + + private void rightRotate(final SortedQueueEntryImpl entry) + { + if(entry != null) + { + final SortedQueueEntryImpl leftChild = leftChild(entry); + entry.setLeft(leftChild.getRight()); + if(entry.getLeft() != null) + { + leftChild.getRight().setParent(entry); + } + leftChild.setParent(entry.getParent()); + if(leftChild.getParent() == null) + { + _root = leftChild; + } + else if(entry == entry.getParent().getRight()) + { + entry.getParent().setRight(leftChild); + } + else + { + entry.getParent().setLeft(leftChild); + } + leftChild.setRight(entry); + entry.setParent(leftChild); + } + } + + private void setColour(final SortedQueueEntryImpl node, final Colour colour) + { + if(node != null) + { + node.setColour(colour); + } + } + + private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getLeft(); + } + + private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getRight(); + } + + private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node) + { + return node == null ? null : node.getParent(); + } + + private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node) + { + return nodeParent(nodeParent(node)); + } + + private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + { + + return node != null && isNodeColour(node.getParent(), colour); + } + + protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + { + return (node == null ? Colour.BLACK : node.getColour()) == colour; + } + + @Override + public SortedQueueEntryImpl next(final SortedQueueEntryImpl node) + { + synchronized(_lock) + { + if(node.isDispensed() && _head != node) + { + SortedQueueEntryImpl current = _head; + SortedQueueEntryImpl next; + while(current != null) + { + next = current.getNextValidEntry(); + if(current.compareTo(node)>0 && !current.isDispensed()) + { + break; + } + else + { + current = next; + } + } + return current; + } + else + { + return node.getNextValidEntry(); + } + } + } + + @Override + public QueueEntryIterator<SortedQueueEntryImpl> iterator() + { + return new QueueEntryIteratorImpl(_head); + } + + @Override + public SortedQueueEntryImpl getHead() + { + return _head; + } + + protected SortedQueueEntryImpl getRoot() + { + return _root; + } + + @Override + public void entryDeleted(final SortedQueueEntryImpl entry) + { + synchronized(_lock) + { + // If the node to be removed has two children, we swap the position + // of the node and its successor in the tree + if(leftChild(entry) != null && rightChild(entry) != null) + { + swapWithSuccessor(entry); + } + + // Then deal with the easy doubly linked list deletion (need to do + // this after the swap as the swap uses next + final SortedQueueEntryImpl prev = entry.getPrev(); + if(prev != null) + { + prev.setNext(entry.getNextValidEntry()); + } + + final SortedQueueEntryImpl next = entry.getNextValidEntry(); + if(next != null) + { + next.setPrev(prev); + } + + // now deal with splicing + final SortedQueueEntryImpl chosenChild; + + if(leftChild(entry) != null) + { + chosenChild = leftChild(entry); + } + else + { + chosenChild = rightChild(entry); + } + + if(chosenChild != null) + { + // we have one child (x), we can move it up to replace x; + chosenChild.setParent(entry.getParent()); + if(chosenChild.getParent() == null) + { + _root = chosenChild; + } + else if(entry == entry.getParent().getLeft()) + { + entry.getParent().setLeft(chosenChild); + } + else + { + entry.getParent().setRight(chosenChild); + } + + entry.setLeft(null); + entry.setRight(null); + entry.setParent(null); + + if(entry.getColour() == Colour.BLACK) + { + deleteFixup(chosenChild); + } + + } + else + { + // no children + if(entry.getParent() == null) + { + // no parent either - the tree is empty + _root = null; + } + else + { + if(entry.getColour() == Colour.BLACK) + { + deleteFixup(entry); + } + + if(entry.getParent() != null) + { + if(entry.getParent().getLeft() == entry) + { + entry.getParent().setLeft(null); + } + else if(entry.getParent().getRight() == entry) + { + entry.getParent().setRight(null); + } + entry.setParent(null); + } + } + } + + } + } + + /** + * Swaps the position of the node in the tree with it's successor + * (that is the node with the next highest key) + * @param entry + */ + private void swapWithSuccessor(final SortedQueueEntryImpl entry) + { + final SortedQueueEntryImpl next = entry.getNextValidEntry(); + final SortedQueueEntryImpl nextParent = next.getParent(); + final SortedQueueEntryImpl nextLeft = next.getLeft(); + final SortedQueueEntryImpl nextRight = next.getRight(); + final Colour nextColour = next.getColour(); + + // Special case - the successor is the right child of the node + if(next == entry.getRight()) + { + next.setParent(entry.getParent()); + if(next.getParent() == null) + { + _root = next; + } + else if(next.getParent().getLeft() == entry) + { + next.getParent().setLeft(next); + } + else + { + next.getParent().setRight(next); + } + + next.setRight(entry); + entry.setParent(next); + next.setLeft(entry.getLeft()); + + if(next.getLeft() != null) + { + next.getLeft().setParent(next); + } + + next.setColour(entry.getColour()); + entry.setColour(nextColour); + entry.setLeft(nextLeft); + + if(nextLeft != null) + { + nextLeft.setParent(entry); + } + entry.setRight(nextRight); + if(nextRight != null) + { + nextRight.setParent(entry); + } + } + else + { + next.setParent(entry.getParent()); + if(next.getParent() == null) + { + _root = next; + } + else if(next.getParent().getLeft() == entry) + { + next.getParent().setLeft(next); + } + else + { + next.getParent().setRight(next); + } + + next.setLeft(entry.getLeft()); + if(next.getLeft() != null) + { + next.getLeft().setParent(next); + } + next.setRight(entry.getRight()); + if(next.getRight() != null) + { + next.getRight().setParent(next); + } + next.setColour(entry.getColour()); + + entry.setParent(nextParent); + if(nextParent.getLeft() == next) + { + nextParent.setLeft(entry); + } + else + { + nextParent.setRight(entry); + } + + entry.setLeft(nextLeft); + if(nextLeft != null) + { + nextLeft.setParent(entry); + } + entry.setRight(nextRight); + if(nextRight != null) + { + nextRight.setParent(entry); + } + entry.setColour(nextColour); + } + } + + private void deleteFixup(SortedQueueEntryImpl entry) + { + int i = 0; + while(entry != null && entry != _root + && isNodeColour(entry, Colour.BLACK)) + { + i++; + + if(i > 1000) + { + return; + } + + if(entry == leftChild(nodeParent(entry))) + { + SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry)); + if(isNodeColour(rightSibling, Colour.RED)) + { + setColour(rightSibling, Colour.BLACK); + nodeParent(entry).setColour(Colour.RED); + leftRotate(nodeParent(entry)); + rightSibling = rightChild(nodeParent(entry)); + } + + if(isNodeColour(leftChild(rightSibling), Colour.BLACK) + && isNodeColour(rightChild(rightSibling), Colour.BLACK)) + { + setColour(rightSibling, Colour.RED); + entry = nodeParent(entry); + } + else + { + if(isNodeColour(rightChild(rightSibling), Colour.BLACK)) + { + setColour(leftChild(rightSibling), Colour.BLACK); + rightSibling.setColour(Colour.RED); + rightRotate(rightSibling); + rightSibling = rightChild(nodeParent(entry)); + } + setColour(rightSibling, getColour(nodeParent(entry))); + setColour(nodeParent(entry), Colour.BLACK); + setColour(rightChild(rightSibling), Colour.BLACK); + leftRotate(nodeParent(entry)); + entry = _root; + } + } + else + { + SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry)); + if(isNodeColour(leftSibling, Colour.RED)) + { + setColour(leftSibling, Colour.BLACK); + nodeParent(entry).setColour(Colour.RED); + rightRotate(nodeParent(entry)); + leftSibling = leftChild(nodeParent(entry)); + } + + if(isNodeColour(leftChild(leftSibling), Colour.BLACK) + && isNodeColour(rightChild(leftSibling), Colour.BLACK)) + { + setColour(leftSibling, Colour.RED); + entry = nodeParent(entry); + } + else + { + if(isNodeColour(leftChild(leftSibling), Colour.BLACK)) + { + setColour(rightChild(leftSibling), Colour.BLACK); + leftSibling.setColour(Colour.RED); + leftRotate(leftSibling); + leftSibling = leftChild(nodeParent(entry)); + } + setColour(leftSibling, getColour(nodeParent(entry))); + setColour(nodeParent(entry), Colour.BLACK); + setColour(leftChild(leftSibling), Colour.BLACK); + rightRotate(nodeParent(entry)); + entry = _root; + } + } + } + setColour(entry, Colour.BLACK); + } + + private Colour getColour(final SortedQueueEntryImpl x) + { + return x == null ? null : x.getColour(); + } + + public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl> + { + private SortedQueueEntryImpl _lastNode; + + public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode) + { + _lastNode = startNode; + } + + public boolean atTail() + { + return next(_lastNode) == null; + } + + public SortedQueueEntryImpl getNode() + { + return _lastNode; + } + + public boolean advance() + { + if(!atTail()) + { + SortedQueueEntryImpl nextNode = next(_lastNode); + while(nextNode.isDispensed() && next(nextNode) != null) + { + nextNode = next(nextNode); + } + _lastNode = nextNode; + return true; + + } + else + { + return false; + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java new file mode 100644 index 0000000000..7a70795e77 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -0,0 +1,19 @@ +package org.apache.qpid.server.queue; + +public class SortedQueueEntryListFactory implements QueueEntryListFactory +{ + + private final String _propertyName; + + public SortedQueueEntryListFactory(final String propertyName) + { + _propertyName = propertyName; + } + + @Override + public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue) + { + return new SortedQueueEntryList(queue, _propertyName); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java index 8bb5d02b01..a43be30b85 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -20,17 +20,16 @@ */ package org.apache.qpid.tools.messagestore.commands; +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.List; import org.apache.commons.codec.binary.Hex; -import org.apache.qpid.server.queue.QueueEntryImpl; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.SimpleQueueEntryImpl; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; -import java.io.UnsupportedEncodingException; -import java.util.LinkedList; -import java.util.List; - public class Dump extends Show { private static final int LINE_SIZE = 8; @@ -259,7 +258,7 @@ public class Dump extends Show String title, boolean routing, boolean headers, boolean messageHeaders) { List<QueueEntry> single = new LinkedList<QueueEntry>(); - single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE)); + single.add(new SimpleQueueEntryImpl(null,msg, Long.MIN_VALUE)); List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index d2f2ae5eea..9941c00499 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -129,7 +129,19 @@ public class QueueConfigurationTest extends TestCase assertEquals(1, qConf.getMinimumAlertRepeatGap()); } - private VirtualHostConfiguration overrideConfiguration(String property, int value) + public void testSortQueueConfiguration() throws ConfigurationException + { + //Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertNull(qConf.getQueueSortKey()); + + // Check explicit value + final VirtualHostConfiguration vhostConfig = overrideConfiguration("sortKey", "test-sort-key"); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals("test-sort-key", qConf.getQueueSortKey()); + } + + private VirtualHostConfiguration overrideConfiguration(String property, Object value) throws ConfigurationException { PropertiesConfiguration queueConfig = new PropertiesConfiguration(); @@ -141,5 +153,4 @@ public class QueueConfigurationTest extends TestCase return new VirtualHostConfiguration("test", config); } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 3b7f5f3a51..7b73987abf 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -20,9 +20,17 @@ */ package org.apache.qpid.server.exchange; -import junit.framework.TestCase; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -52,17 +60,6 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.util.InternalBrokerBaseCase; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); @@ -483,6 +480,16 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return false; } + + public QueueEntry getNextNode() + { + return null; + } + + public QueueEntry getNextValidEntry() + { + return null; + } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 3961b3b355..2ce43052d9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.queue; */ import java.util.ArrayList; - +import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; -import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; -import junit.framework.AssertionFailedError; +import org.apache.qpid.server.message.AMQMessage; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { @@ -35,7 +35,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest public void setUp() throws Exception { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3); super.setUp(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index 7000df157e..12369bd7d4 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -20,24 +20,23 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; public class MockAMQMessage extends AMQMessage { public MockAMQMessage(long messageId) - throws AMQException { super(new MockStoredMessage(messageId)); } - - + public MockAMQMessage(long messageId, String headerName, Object headerValue) + { + super(new MockStoredMessage(messageId, headerName, headerValue)); + } @Override public long getSize() { return 0l; } - } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index ab8850c18c..864b9ad368 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -234,4 +234,14 @@ public class MockQueueEntry implements QueueEntry return false; } + public QueueEntry getNextNode() + { + return null; + } + + public QueueEntry getNextValidEntry() + { + return null; + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java index 7dc491de4d..78ed3e9f34 100755 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.framing.FieldTable; + import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLog; import org.apache.qpid.server.store.StoredMessage; @@ -36,18 +38,32 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> private MessageMetaData _metaData; private final ByteBuffer _content; - public MockStoredMessage(long messageId) { - this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60)); + this(messageId, (String)null, null); + } + + public MockStoredMessage(long messageId, String headerName, Object headerValue) + { + this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60), headerName, headerValue); } public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb) { + this(messageId, info, chb, null, null); + } + + public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb, String headerName, Object headerValue) + { _messageId = messageId; + if (headerName != null) + { + FieldTable headers = new FieldTable(); + headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue)); + ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers); + } _metaData = new MessageMetaData(info, chb, 0); _content = ByteBuffer.allocate(_metaData.getContentSize()); - } public MessageMetaData getMetaData() diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index d8afd8d829..d336132316 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -19,9 +19,7 @@ package org.apache.qpid.server.queue; import java.lang.reflect.Field; - import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; @@ -30,18 +28,27 @@ import org.apache.qpid.server.subscription.Subscription; /** * Tests for {@link QueueEntryImpl} - * */ -public class QueueEntryImplTest extends TestCase +public abstract class QueueEntryImplTestBase extends TestCase { // tested entry - private QueueEntryImpl _queueEntry; + protected QueueEntryImpl _queueEntry; + protected QueueEntryImpl _queueEntry2; + protected QueueEntryImpl _queueEntry3; + + public abstract QueueEntryImpl getQueueEntryImpl(int msgid) throws AMQException; + + public abstract void testCompareTo(); + + public abstract void testTraverseWithNoDeletedEntries(); + + public abstract void testTraverseWithDeletedEntries(); public void setUp() throws Exception { - AMQMessage message = new MockAMQMessage(1); - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - _queueEntry = new QueueEntryImpl(queueEntryList, message, 1); + _queueEntry = getQueueEntryImpl(1); + _queueEntry2 = getQueueEntryImpl(2); + _queueEntry3 = getQueueEntryImpl(3); } public void testAquire() @@ -105,61 +112,6 @@ public class QueueEntryImplTest extends TestCase } /** - * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. - */ - public void testGetNext() - { - int numberOfEntries = 5; - QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - - // create test entries - for(int i = 0; i < numberOfEntries ; i++) - { - AMQMessage message = null;; - try - { - message = new MockAMQMessage(i); - } - catch (AMQException e) - { - fail("Failure to create a mock message:" + e.getMessage()); - } - QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); - entries[i] = entry; - } - - // test getNext for not acquired entries - for(int i = 0; i < numberOfEntries ; i++) - { - QueueEntryImpl queueEntry = entries[i]; - QueueEntryImpl next = queueEntry.getNext(); - if (i < numberOfEntries - 1) - { - assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); - } - else - { - assertNull("The next entry after the last should be null", next); - } - } - - // delete second - entries[1].acquire(); - entries[1].delete(); - - // dequeue third - entries[2].acquire(); - entries[2].dequeue(); - - QueueEntryImpl next = entries[0].getNext(); - assertEquals("expected forth entry",entries[3], next); - next = next.getNext(); - assertEquals("expected fifth entry", entries[4], next); - next = next.getNext(); - assertNull("The next entry after the last should be null", next); - } - /** * A helper method to put tested object into deleted state and assert the state */ private void delete() @@ -244,4 +196,52 @@ public class QueueEntryImplTest extends TestCase assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); } + + /** + * Tests if entries in DEQUQUED or DELETED state are not returned by getNext method. + */ + public void testGetNext() + { + int numberOfEntries = 5; + QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; + SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + // create test entries + for(int i = 0; i < numberOfEntries ; i++) + { + AMQMessage message = new MockAMQMessage(i); + QueueEntryImpl entry = (QueueEntryImpl)queueEntryList.add(message); + entries[i] = entry; + } + + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries ; i++) + { + QueueEntryImpl queueEntry = entries[i]; + QueueEntry next = queueEntry.getNextValidEntry(); + if (i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else + { + assertNull("The next entry after the last should be null", next); + } + } + + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + QueueEntry next = entries[0].getNextValidEntry(); + assertEquals("expected forth entry",entries[3], next); + next = next.getNextValidEntry(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNextValidEntry(); + assertNull("The next entry after the last should be null", next); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java new file mode 100644 index 0000000000..7a3f6f701c --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +/** + * Abstract test class for QueueEntryList implementations. + */ +public abstract class QueueEntryListTestBase extends TestCase +{ + protected static final AMQQueue _testQueue = new MockAMQQueue("test"); + public abstract QueueEntryList<QueueEntry> getTestList(); + public abstract long getExpectedFirstMsgId(); + public abstract int getExpectedListLength(); + public abstract ServerMessage getTestMessageToAdd() throws AMQException; + + public void testGetQueue() + { + assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue); + } + + /** + * Test to add a message with properties specific to the queue type. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getTestMessageToAdd() + * @throws AMQException + */ + public void testAddSpecificMessage() throws AMQException + { + final QueueEntryList<QueueEntry> list = getTestList(); + list.add(getTestMessageToAdd()); + + final QueueEntryIterator<?> iter = list.iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("List did not grow by one entry after an add", getExpectedListLength() + 1, count); + } + + /** + * Test to add a generic mock message. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + * @see MockAMQMessage + * @throws AMQException + */ + public void testAddGenericMessage() throws AMQException + { + final QueueEntryList<QueueEntry> list = getTestList(); + list.add(new MockAMQMessage(666)); + + final QueueEntryIterator<?> iter = list.iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("List did not grow by one entry after a generic message added", getExpectedListLength() + 1, count); + + } + + /** + * Test for getting the next element in a queue list. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testListNext() + { + final QueueEntryList<QueueEntry> entryList = getTestList(); + QueueEntry entry = entryList.getHead(); + int count = 0; + while(entryList.next(entry) != null) + { + entry = entryList.next(entry); + count++; + } + assertEquals("Get next didnt get all the list entries", getExpectedListLength(), count); + } + + /** + * Basic test for the associated QueueEntryIterator implementation. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testIterator() + { + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count = 0; + while(iter.advance()) + { + iter.getNode(); + count++; + } + assertEquals("Iterator invalid", getExpectedListLength(), count); + } + + /** + * Test for associated QueueEntryIterator implementation that checks it handles "removed" messages. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedListLength() + */ + public void testDequedMessagedNotPresentInIterator() throws Exception + { + final int numberOfMessages = getExpectedListLength(); + final QueueEntryList<QueueEntry> entryList = getTestList(); + + // dequeue all even messages + final QueueEntryIterator<?> it1 = entryList.iterator(); + int counter = 0; + while (it1.advance()) + { + final QueueEntry queueEntry = it1.getNode(); + if(counter++ % 2 == 0) + { + queueEntry.acquire(); + queueEntry.dequeue(); + } + } + + // iterate and check that dequeued messages are not returned by iterator + final QueueEntryIterator<?> it2 = entryList.iterator(); + int counter2 = 0; + while(it2.advance()) + { + it2.getNode(); + counter2++; + } + final int expectedNumber = numberOfMessages / 2; + assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter2, + expectedNumber, counter2); + } + + /** + * Test to verify the head of the queue list is returned as expected. + * @see QueueEntryListTestBase#getTestList() + * @see QueueEntryListTestBase#getExpectedFirstMsgId() + */ + public void testGetHead() + { + final QueueEntry head = getTestList().getHead(); + assertNull("Head entry should not contain an actual message", head.getMessage()); + assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head) + .getMessage().getMessageNumber().longValue()); + } + + /** + * Test to verify the entry deletion handled correctly. + * @see QueueEntryListTestBase#getTestList() + */ + public void testEntryDeleted() + { + final QueueEntry head = getTestList().getHead(); + + final QueueEntry first = getTestList().next(head); + first.delete(); + + final QueueEntry second = getTestList().next(head); + assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second + .getMessage().getMessageNumber()); + + final QueueEntry third = getTestList().next(first); + assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second + .getMessage().getMessageNumber(), third.getMessage().getMessageNumber()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java deleted file mode 100644 index b67723dd25..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.test.utils.QpidTestCase; - -/** - * - * Tests QueueEntry - * - */ -public class QueueEntryTest extends QpidTestCase -{ - private QueueEntryImpl _queueEntry1 = null; - private QueueEntryImpl _queueEntry2 = null; - private QueueEntryImpl _queueEntry3 = null; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - int i = 0; - - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(null); - _queueEntry1 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - _queueEntry2 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - _queueEntry3 = (QueueEntryImpl) queueEntryList.add(new MockAMQMessage(i++)); - } - - public void testCompareTo() - { - assertTrue(_queueEntry1.compareTo(_queueEntry2) < 0); - assertTrue(_queueEntry2.compareTo(_queueEntry1) > 0); - assertTrue(_queueEntry1.compareTo(_queueEntry1) == 0); - } - - /** - * Tests that the getNext() can be used to traverse the list. - */ - public void testTraverseWithNoDeletedEntries() - { - QueueEntryImpl current = _queueEntry1; - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry2, current); - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry3, current); - - current = current.getNext(); - assertNull(current); - - } - - /** - * Tests that the getNext() can be used to traverse the list but deleted - * entries are skipped and de-linked from the chain of entries. - */ - public void testTraverseWithDeletedEntries() - { - // Delete 2nd queue entry - _queueEntry2.delete(); - assertTrue(_queueEntry2.isDeleted()); - - - QueueEntryImpl current = _queueEntry1; - - current = current.getNext(); - assertSame("Unexpected current entry",_queueEntry3, current); - - current = current.getNext(); - assertNull(current); - - // Assert the side effects of getNext() - assertSame("Next node of entry 1 should now be entry 3", - _queueEntry3, _queueEntry1.nextNode()); - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java new file mode 100644 index 0000000000..7ff693e4c4 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.Assert; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; + +/** + * Test extension of SortedQueueEntryList that provides data structure validation tests. + * @see SortedQueueEntryList + */ +public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList +{ + public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName) + { + super(queue, propertyName); + } + + @Override /** Overridden to automatically check queue properties before and after. */ + public SortedQueueEntryImpl add(final ServerMessage message) + { + assertQueueProperties(); //before add + final SortedQueueEntryImpl result = super.add(message); + assertQueueProperties(); //after add + return result; + } + + @Override /** Overridden to automatically check queue properties before and after. */ + public void entryDeleted(SortedQueueEntryImpl entry) + { + assertQueueProperties(); //before delete + super.entryDeleted(entry); + assertQueueProperties(); //after delete + } + + public void assertQueueProperties() + { + assertRootIsBlack(); + assertTreeIntegrity(); + assertChildrenOfRedAreBlack(); + assertLeavesSameBlackPath(); + } + + public void assertRootIsBlack() + { + if(!isNodeColour(getRoot(), Colour.BLACK)) + { + Assert.fail("Root Not Black"); + } + } + + public void assertTreeIntegrity() + { + assertTreeIntegrity(getRoot()); + } + + public void assertTreeIntegrity(final SortedQueueEntryImpl node) + { + if(node == null) + { + return; + } + if(node.getLeft() != null) + { + if(node.getLeft().getParent() == node) + { + assertTreeIntegrity(node.getLeft()); + } + else + { + Assert.fail("Tree integrity compromised"); + } + } + if(node.getRight() != null) + { + if(node.getRight().getParent() == node) + { + assertTreeIntegrity(node.getRight()); + } + else + { + Assert.fail("Tree integrity compromised"); + } + + } + } + + public void assertLeavesSameBlackPath() + { + assertLeavesSameBlackPath(getRoot()); + } + + public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node) + { + if(node == null) + { + return 1; + } + final int left = assertLeavesSameBlackPath(node.getLeft()); + final int right = assertLeavesSameBlackPath(node.getLeft()); + if(left == right) + { + return isNodeColour(node, Colour.BLACK) ? 1 + left : left; + } + else + { + Assert.fail("Unequal paths to leaves"); + return 1; //compiler + } + } + + public void assertChildrenOfRedAreBlack() + { + assertChildrenOfRedAreBlack(getRoot()); + } + + public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node) + { + if(node == null) + { + return; + } + else if(node.getColour() == Colour.BLACK) + { + assertChildrenOfRedAreBlack(node.getLeft()); + assertChildrenOfRedAreBlack(node.getRight()); + } + else + { + if(isNodeColour(node.getLeft(), Colour.BLACK) + && isNodeColour(node.getRight(), Colour.BLACK)) + { + assertChildrenOfRedAreBlack(node.getLeft()); + assertChildrenOfRedAreBlack(node.getRight()); + } + else + { + Assert.fail("Children of Red are not both black"); + } + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index f4cdbbe02c..e4ed232f13 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -21,8 +21,12 @@ package org.apache.qpid.server.queue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.PropertiesConfiguration; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; @@ -40,6 +44,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.subscription.MockSubscription; @@ -51,12 +56,6 @@ import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class SimpleAMQQueueTest extends InternalBrokerBaseCase { @@ -1110,9 +1109,9 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase * Entries with even message id are considered * dequeued! */ - protected QueueEntryImpl createQueueEntry(final ServerMessage message) + protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message) { - return new QueueEntryImpl(this, message) + return new SimpleQueueEntryImpl(this, message) { public boolean isDequeued() { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java new file mode 100644 index 0000000000..d8d78bbb84 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -0,0 +1,59 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase { + + private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + ServerMessage message = new MockAMQMessage(msgId); + return queueEntryList.add(message); + } + + public void testCompareTo() + { + assertTrue(_queueEntry.compareTo(_queueEntry2) < 0); + assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0); + assertTrue(_queueEntry.compareTo(_queueEntry3) < 0); + + assertTrue(_queueEntry2.compareTo(_queueEntry) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry) > 0); + + assertTrue(_queueEntry.compareTo(_queueEntry) == 0); + assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0); + assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0); + } + + public void testTraverseWithNoDeletedEntries() + { + QueueEntry current = _queueEntry; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry2, current); + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertNull(current); + + } + + public void testTraverseWithDeletedEntries() + { + // Delete 2nd queue entry + _queueEntry2.delete(); + assertTrue(_queueEntry2.isDeleted()); + + QueueEntry current = _queueEntry; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertNull(current); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java index 7136f07ca5..f3ba6a5495 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java @@ -22,21 +22,28 @@ package org.apache.qpid.server.queue; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; -import junit.framework.TestCase; - -public class SimpleQueueEntryListTest extends TestCase +public class SimpleQueueEntryListTest extends QueueEntryListTestBase { + private SimpleQueueEntryList _sqel; + private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; String oldScavengeValue = null; - + @Override protected void setUp() { oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); + _sqel = new SimpleQueueEntryList(_testQueue); + for(int i = 1; i <= 100; i++) + { + final ServerMessage msg = new MockAMQMessage(i); + final QueueEntry bleh = _sqel.add(msg); + assertNotNull("QE should not have been null", bleh); + } } @Override @@ -52,19 +59,28 @@ public class SimpleQueueEntryListTest extends TestCase } } - /** - * Tests the behavior of the next(QueuyEntry) method. - */ - public void testNext() throws Exception + @Override + public QueueEntryList getTestList() { - SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); - int i = 0; + return _sqel; + } + + @Override + public long getExpectedFirstMsgId() + { + return 1; + } - QueueEntry queueEntry1 = sqel.add(new MockAMQMessage(i++)); - QueueEntry queueEntry2 = sqel.add(new MockAMQMessage(i++)); + @Override + public int getExpectedListLength() + { + return 100; + } - assertSame(queueEntry2, sqel.next(queueEntry1)); - assertNull(sqel.next(queueEntry2)); + @Override + public AMQMessage getTestMessageToAdd() throws AMQException + { + return new MockAMQMessage(1l); } public void testScavenge() throws Exception @@ -82,7 +98,7 @@ public class SimpleQueueEntryListTest extends TestCase entriesMap.put(i,bleh); } - QueueEntryImpl head = ((QueueEntryImpl) sqel.getHead()); + SimpleQueueEntryImpl head = sqel.getHead(); //We shall now delete some specific messages mid-queue that will lead to //requiring a scavenge once the requested threshold of 9 deletes is passed @@ -99,11 +115,10 @@ public class SimpleQueueEntryListTest extends TestCase assertTrue("Failed to delete QueueEntry", entriesMap.remove(14).delete()); verifyDeletedButPresentBeforeScavenge(head, 14); - //Delete message 20 only assertTrue("Failed to delete QueueEntry", entriesMap.remove(20).delete()); verifyDeletedButPresentBeforeScavenge(head, 20); - + //Delete messages 81 to 84 assertTrue("Failed to delete QueueEntry", entriesMap.remove(81).delete()); verifyDeletedButPresentBeforeScavenge(head, 81); @@ -113,35 +128,35 @@ public class SimpleQueueEntryListTest extends TestCase verifyDeletedButPresentBeforeScavenge(head, 83); assertTrue("Failed to delete QueueEntry", entriesMap.remove(84).delete()); verifyDeletedButPresentBeforeScavenge(head, 84); - + //Delete message 99 - this is the 10th message deleted that is after the queue head //and so will invoke the scavenge() which is set to go after 9 previous deletions assertTrue("Failed to delete QueueEntry", entriesMap.remove(99).delete()); verifyAllDeletedMessagedNotPresent(head, entriesMap); } - - private void verifyDeletedButPresentBeforeScavenge(QueueEntryImpl head, long messageId) + + private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId) { //Use the head to get the initial entry in the queue - QueueEntryImpl entry = head._next; - + SimpleQueueEntryImpl entry = head.getNextNode(); + for(long i = 1; i < messageId ; i++) { assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber()); - entry = entry._next; + entry = entry.getNextNode(); } - + assertTrue("Entry should have been deleted", entry.isDeleted()); } - - private void verifyAllDeletedMessagedNotPresent(QueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) + + private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) { //Use the head to get the initial entry in the queue - QueueEntryImpl entry = head._next; - + SimpleQueueEntryImpl entry = head.getNextNode(); + assertNotNull("Initial entry should not have been null", entry); - + int count = 0; while (entry != null) @@ -149,62 +164,56 @@ public class SimpleQueueEntryListTest extends TestCase assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted()); assertNotNull("QueueEntry was not found in the list of remaining entries", remainingMessages.get(entry.getMessage().getMessageNumber().intValue())); - + count++; - entry = entry._next; + entry = entry.getNextNode(); } - + assertEquals("Count should have been equal",count,remainingMessages.size()); } - public void testDequedMessagedNotPresentInIterator() + public void testGettingNextElement() { - int numberOfMessages = 10; - SimpleQueueEntryList entryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - QueueEntry[] entries = new QueueEntry[numberOfMessages]; + final int numberOfEntries = 5; + final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries]; + final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); - for(int i = 0; i < numberOfMessages ; i++) + // create test entries + for(int i = 0; i < numberOfEntries; i++) { - AMQMessage message = null;; - try - { - message = new MockAMQMessage(i); - } - catch (AMQException e) - { - fail("Failure to create a mock message:" + e.getMessage()); - } - QueueEntry entry = entryList.add(message); - assertNotNull("QE should not be null", entry); - entries[i]= entry; + AMQMessage message = new MockAMQMessage(i); + entries[i] = queueEntryList.add(message); } - // dequeue all even messages - for (QueueEntry queueEntry : entries) + // test getNext for not acquired entries + for(int i = 0; i < numberOfEntries; i++) { - long i = ((AMQMessage)queueEntry.getMessage()).getMessageId().longValue(); - if (i%2 == 0) + final SimpleQueueEntryImpl next = entries[i].getNextValidEntry(); + + if(i < numberOfEntries - 1) + { + assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next); + } + else { - queueEntry.acquire(); - queueEntry.dequeue(); + assertNull("The next entry after the last should be null", next); } } - // iterate and check that dequeued messages are not returned by iterator - QueueEntryIterator it = entryList.iterator(); - int counter = 0; - int i = 1; - while (it.advance()) - { - QueueEntry entry = it.getNode(); - Long id = ((AMQMessage)entry.getMessage()).getMessageId(); - assertEquals("Expected message with id " + i + " but got message with id " - + id, new Long(i), id); - counter++; - i += 2; - } - int expectedNumber = numberOfMessages / 2; - assertEquals("Expected " + expectedNumber + " number of entries in iterator but got " + counter, - expectedNumber, counter); + // delete second + entries[1].acquire(); + entries[1].delete(); + + // dequeue third + entries[2].acquire(); + entries[2].dequeue(); + + SimpleQueueEntryImpl next = entries[2].getNextValidEntry(); + assertEquals("expected forth entry", entries[3], next); + next = next.getNextValidEntry(); + assertEquals("expected fifth entry", entries[4], next); + next = next.getNextValidEntry(); + assertNull("The next entry after the last should be null", next); } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java new file mode 100644 index 0000000000..43fb5b4cb3 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java @@ -0,0 +1,62 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SortedQueueEntryImplTest extends QueueEntryImplTestBase { + + public final static String keys[] = { "CCC", "AAA", "BBB" }; + + private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY"); + + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { + final ServerMessage message = new MockAMQMessage(msgId, "KEY", keys[msgId-1]); + return queueEntryList.add(message); + } + + public void testCompareTo() + { + assertTrue(_queueEntry.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry.compareTo(_queueEntry3) > 0); + + assertTrue(_queueEntry2.compareTo(_queueEntry3) < 0); + assertTrue(_queueEntry2.compareTo(_queueEntry) < 0); + + assertTrue(_queueEntry3.compareTo(_queueEntry2) > 0); + assertTrue(_queueEntry3.compareTo(_queueEntry) < 0); + + assertTrue(_queueEntry.compareTo(_queueEntry) == 0); + assertTrue(_queueEntry2.compareTo(_queueEntry2) == 0); + assertTrue(_queueEntry3.compareTo(_queueEntry3) == 0); + } + + public void testTraverseWithNoDeletedEntries() + { + QueueEntry current = _queueEntry2; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry3, current); + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry, current); + + current = current.getNextValidEntry(); + assertNull(current); + + } + + public void testTraverseWithDeletedEntries() + { + // Delete 2nd queue entry + _queueEntry3.delete(); + assertTrue(_queueEntry3.isDeleted()); + + QueueEntry current = _queueEntry2; + + current = current.getNextValidEntry(); + assertSame("Unexpected current entry",_queueEntry, current); + + current = current.getNextValidEntry(); + assertNull(current); + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java new file mode 100644 index 0000000000..eca845644e --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -0,0 +1,323 @@ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.AMQMessage; + +import java.util.Arrays; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; + +public class SortedQueueEntryListTest extends QueueEntryListTestBase +{ + private static SelfValidatingSortedQueueEntryList _sqel; + + public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144", + " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81", + "195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45", + " 64", "100", " 83", " 51", " 79", " 82", "180", " 26", " 61", " 62", + " 78", " 46", "147", " 91", "120", "164", " 92", "172", "188", " 50", + "111", " 89", " 4", " 8", " 16", "151", "122", "178", " 33", "124", + "171", "165", "116", "113", "155", "148", " 29", " 0", " 37", "131", + "146", " 57", "112", " 97", " 23", "108", "123", "117", "167", " 52", + " 98", " 6", "160", " 25", " 49", " 34", "182", "185", " 30", " 66", + "152", " 58", " 86", "118", "189", " 84", " 36", "104", " 7", " 76", + " 87", " 1", " 80", " 10", "142", " 59", "137", " 12", " 67", " 22", + " 9", "106", " 75", "109", " 93", " 42", "177", "134", " 77", " 88", + "114", " 43", "143", "135", " 55", "181", " 32", "174", "175", "184", + "133", "107", " 28", "126", "103", " 85", " 38", "158", " 39", "162", + "129", "194", " 15", " 24", " 19", " 35", "186", " 31", " 65", " 99", + "192", " 74", "156", " 27", " 95", " 54", " 70", " 13", "110", " 41", + " 90", "173", "125", "196", "130", "183", "102", "190", "132", "105", + " 21", " 53", "139", " 94", "115", " 48", " 44", "179", "128", " 14", + " 72", "119", "153", "168", "197", " 40", "150", "138", " 5", "154", + "169", " 71", "199", "198", "170", " 3", "121", " 20", " 63", "193" }; + + public final static String textkeys[] = { "AAA", "BBB", "CCC", "DDD", "EEE", "FFF", "GGG", "HHH", "III", "JJJ", + "KKK", "LLL", "MMM", "NNN", "OOO", "PPP", "QQQ", "RRR", "SSS", "TTT", + "UUU", "VVV", "XXX", "YYY", "ZZZ"}; + + private final static String keysSorted[] = keys.clone(); + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + // Create result array + Arrays.sort(keysSorted); + + // Create test list + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(final String key : keys) + { + final ServerMessage msg = generateTestMessage(messageId++, key); + _sqel.add(msg); + } + + } + + public QueueEntryList getTestList() + { + return _sqel; + } + + public int getExpectedListLength() + { + return keys.length; + } + + public long getExpectedFirstMsgId() + { + return 67L; + } + + public ServerMessage getTestMessageToAdd() throws AMQException + { + return generateTestMessage(1, "test value"); + } + + private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException + { + return new AMQMessage(new MockStoredMessage(id, "KEY", keyValue)); + } + + public void testIterator() + { + super.testIterator(); + + // Test sorted order of list + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count = 0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value does not match sorted key array", + keysSorted[count++], getSortedKeyValue(iter)); + } + } + + private Object getSortedKeyValue(QueueEntryIterator<?> iter) + { + return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageHeader().getHeader("KEY"); + } + + private Long getMessageId(QueueEntryIterator<?> iter) + { + return ((SortedQueueEntryImpl) iter.getNode()).getMessage().getMessageNumber(); + } + + public void testNonUniqueSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + while(messageId < 200) + { + final ServerMessage msg = generateTestMessage(messageId++, "samekey"); + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", "samekey", getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); + } + } + + public void testNullSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + while(messageId < 200) + { + final ServerMessage msg = generateTestMessage(messageId++, null); + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); } + } + + public void testAscendingSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(String textKey : textkeys) + { + final ServerMessage msg = generateTestMessage(messageId, textKey); + messageId++; + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(count), getMessageId(iter)); + count++; + } + } + + public void testDescendingSortKeys() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + // Build test list + long messageId = 0L; + for(int i=textkeys.length-1; i >=0; i--) + { + final ServerMessage msg = generateTestMessage(messageId, textkeys[i]); + messageId++; + _sqel.add(msg); + } + + final QueueEntryIterator<?> iter = getTestList().iterator(); + int count=0; + while(iter.advance()) + { + assertEquals("Sorted queue entry value is not as expected", textkeys[count], getSortedKeyValue(iter)); + assertEquals("Message id not as expected", Long.valueOf(textkeys.length-count-1), getMessageId(iter)); + count++; + } + } + + public void testInsertAfter() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "A"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + msg = generateTestMessage(2, "B"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 2); + } + + public void testInsertBefore() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "B"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + msg = generateTestMessage(2, "A"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 2); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 1); + } + + public void testInsertInbetween() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "A"); + _sqel.add(msg); + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + msg = generateTestMessage(2, "C"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 2); + + msg = generateTestMessage(3, "B"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 2); + } + + public void testInsertAtHead() throws Exception + { + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + + ServerMessage msg = generateTestMessage(1, "B"); + _sqel.add(msg); + + SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + msg = generateTestMessage(2, "D"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + + msg = generateTestMessage(3, "C"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + + msg = generateTestMessage(4, "A"); + _sqel.add(msg); + + entry = _sqel.next(_sqel.getHead()); + validateEntry(entry, "A", 4); + + entry = _sqel.next(entry); + validateEntry(entry, "B", 1); + + entry = _sqel.next(entry); + validateEntry(entry, "C", 3); + + entry = _sqel.next(entry); + validateEntry(entry, "D", 2); + } + + private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId) + { + assertEquals("Sorted queue entry value is not as expected", + expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY")); + assertEquals("Sorted queue entry id is not as expected", + Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber()); + } + +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 3acd064fd7..1d0a9d6316 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -689,7 +689,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (usePriority) { queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) @@ -767,7 +767,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); @@ -781,7 +781,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); - queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL); QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 2ce1251eab..962aec0d1e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -20,35 +20,26 @@ */ package org.apache.qpid.server.queue; -import junit.framework.TestCase; -import junit.framework.Assert; -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -import javax.jms.*; -import javax.naming.NamingException; -import javax.naming.Context; -import javax.naming.spi.InitialContextFactory; -import java.util.Hashtable; import java.util.HashMap; import java.util.Map; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; -public class PriorityTest extends QpidBrokerTestCase +public class PriorityQueueTest extends QpidBrokerTestCase { private static final int TIMEOUT = 1500; - - private static final Logger _logger = Logger.getLogger(PriorityTest.class); - protected final String QUEUE = "PriorityQueue"; private static final int MSG_COUNT = 50; @@ -60,9 +51,8 @@ public class PriorityTest extends QpidBrokerTestCase private Connection consumerConnection; private Session consumerSession; - private MessageConsumer consumer; - + protected void setUp() throws Exception { super.setUp(); @@ -71,10 +61,10 @@ public class PriorityTest extends QpidBrokerTestCase producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); producerConnection.start(); - + consumerConnection = getConnection(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - + } protected void tearDown() throws Exception @@ -111,7 +101,7 @@ public class PriorityTest extends QpidBrokerTestCase Message previous = null; int messageCount = 0; while((received = consumer.receive(1000))!=null) - { + { messageCount++; if(previous != null) { @@ -124,17 +114,17 @@ public class PriorityTest extends QpidBrokerTestCase assertEquals("Incorrect number of message received", 50, receivedCount); } - + public void testOddOrdering() throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-priorities",3); ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); - + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); producer = producerSession.createProducer(queue); - + // In order ABC producer.setPriority(9); producer.send(nextMessage(1, false, producerSession, producer)); @@ -151,14 +141,14 @@ public class PriorityTest extends QpidBrokerTestCase producer.setPriority(1); producer.send(nextMessage(6, false, producerSession, producer)); - // Out of order BCA + // Out of order BCA producer.setPriority(4); producer.send(nextMessage(7, false, producerSession, producer)); producer.setPriority(1); producer.send(nextMessage(8, false, producerSession, producer)); producer.setPriority(9); producer.send(nextMessage(9, false, producerSession, producer)); - + // Reverse order CBA producer.setPriority(1); producer.send(nextMessage(10, false, producerSession, producer)); @@ -167,10 +157,10 @@ public class PriorityTest extends QpidBrokerTestCase producer.setPriority(9); producer.send(nextMessage(12, false, producerSession, producer)); producerSession.commit(); - + consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - + Message msg = consumer.receive(TIMEOUT); assertEquals(1, msg.getIntProperty("msg")); msg = consumer.receive(TIMEOUT); @@ -179,7 +169,7 @@ public class PriorityTest extends QpidBrokerTestCase assertEquals(9, msg.getIntProperty("msg")); msg = consumer.receive(TIMEOUT); assertEquals(12, msg.getIntProperty("msg")); - + msg = consumer.receive(TIMEOUT); assertEquals(2, msg.getIntProperty("msg")); msg = consumer.receive(TIMEOUT); @@ -188,7 +178,7 @@ public class PriorityTest extends QpidBrokerTestCase assertEquals(7, msg.getIntProperty("msg")); msg = consumer.receive(TIMEOUT); assertEquals(11, msg.getIntProperty("msg")); - + msg = consumer.receive(TIMEOUT); assertEquals(3, msg.getIntProperty("msg")); msg = consumer.receive(TIMEOUT); @@ -206,6 +196,4 @@ public class PriorityTest extends QpidBrokerTestCase return send; } - - } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java new file mode 100644 index 0000000000..83046b84a5 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Arrays; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.NamingException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class SortedQueueTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = Logger.getLogger(SortedQueueTest.class); + public static final String TEST_SORT_KEY = "testSortKey"; + private static final String VALUES[] = SortedQueueEntryListTest.keys.clone(); + private static final String VALUES_SORTED[] = SortedQueueEntryListTest.keys.clone(); + public final static String SUBSET_KEYS[] = { "000", "100", "200", "300", "400", "500", "600", "700", "800", "900" }; + + private Connection _producerConnection; + private Session _producerSession; + private Connection _consumerConnection; + + protected void setUp() throws Exception + { + super.setUp(); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "1"); + // Sort value array to generated "expected" order of messages. + Arrays.sort(VALUES_SORTED); + _producerConnection = getConnection(); + _consumerConnection = getConnection(); + _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + protected void tearDown() throws Exception + { + _producerSession.close(); + _producerConnection.close(); + _consumerConnection.close(); + super.tearDown(); + } + + public void testSortOrder() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + for(String value : VALUES) + { + final Message msg = _producerSession.createTextMessage("Message Text:" + value); + msg.setStringProperty(TEST_SORT_KEY, value); + producer.send(msg); + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received; + int messageCount = 0; + while((received = (TextMessage) consumer.receive(1000)) != null) + { + assertEquals("Received message with unexpected sorted key value", VALUES_SORTED[messageCount], + received.getStringProperty(TEST_SORT_KEY)); + assertEquals("Received message with unexpected message value", + "Message Text:" + VALUES_SORTED[messageCount], received.getText()); + messageCount++; + } + + assertEquals("Incorrect number of messages received", VALUES.length, messageCount); + } + + public void testAutoAckSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.AUTO_ACKNOWLEDGE); + } + + public void testTransactedSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.SESSION_TRANSACTED); + } + + public void testClientAckSortedQueue() throws JMSException, NamingException, AMQException + { + runThroughSortedQueueForSessionMode(Session.CLIENT_ACKNOWLEDGE); + } + + private void runThroughSortedQueueForSessionMode(final int sessionMode) throws JMSException, NamingException, + AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + final TestConsumerThread consumerThread = new TestConsumerThread(sessionMode, queue); + consumerThread.start(); + final Calendar cal = Calendar.getInstance(Locale.UK); + + for(String value : VALUES) + { + final Message msg = _producerSession.createTextMessage(String.valueOf(cal.getTimeInMillis())); + msg.setStringProperty(TEST_SORT_KEY, value); + producer.send(msg); + _producerSession.commit(); + } + + synchronized(consumerThread) + { + try + { + consumerThread.join(5000L); + } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + } + assertTrue("Consumer timed out", consumerThread.isStopped()); + assertEquals("Incorrect number of messages received", VALUES.length, consumerThread.getConsumed()); + + producer.close(); + } + + public void testSortedQueueWithAscendingSortedKeys() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + final TestConsumerThread consumerThread = new TestConsumerThread(Session.AUTO_ACKNOWLEDGE, queue); + consumerThread.start(); + + for(int i = 0; i < 200; i++) + { + final String ascendingKey = AscendingSortedKeys.getNextKey(); + final Message msg = _producerSession.createTextMessage("Message Text:" + ascendingKey); + msg.setStringProperty(TEST_SORT_KEY, ascendingKey); + producer.send(msg); + _producerSession.commit(); + } + + synchronized(consumerThread) + { + try + { + consumerThread.join(5000L); + } + catch(InterruptedException e) + { + fail("Test failed waiting for consumer to complete"); + } + } + assertTrue("Consumer timed out", consumerThread.isStopped()); + assertEquals("Incorrect number of messages received", 200, consumerThread.getConsumed()); + + producer.close(); + } + + public void testSortOrderWithNonUniqueKeys() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + int count = 0; + while(count < 200) + { + final Message msg = _producerSession.createTextMessage("Message Text:" + count++); + msg.setStringProperty(TEST_SORT_KEY, "samesortkeyvalue"); + producer.send(msg); + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received = null; + int messageCount = 0; + + while((received = (TextMessage) consumer.receive(1000)) != null) + { + assertEquals("Received message with unexpected sorted key value", "samesortkeyvalue", + received.getStringProperty(TEST_SORT_KEY)); + assertEquals("Received message with unexpected message value", "Message Text:" + messageCount, + received.getText()); + messageCount++; + } + + assertEquals("Incorrect number of messages received", 200, messageCount); + } + + public void testSortOrderWithUniqueKeySubset() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + int count = 0; + while(count < 100) + { + int keyValueIndex = count % 10; + final Message msg = _producerSession.createTextMessage("Message Text:" + count); + msg.setStringProperty(TEST_SORT_KEY, SUBSET_KEYS[keyValueIndex]); + producer.send(msg); + count++; + } + + _producerSession.commit(); + producer.close(); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + TextMessage received; + int messageCount = 0; + + while((received = (TextMessage) consumer.receive(1000)) != null) + { + assertEquals("Received message with unexpected sorted key value", SUBSET_KEYS[messageCount / 10], + received.getStringProperty(TEST_SORT_KEY)); + messageCount++; + } + + assertEquals("Incorrect number of messages received", 100, messageCount); + } + + public void testGetNext() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + sendAndCommitMessage(producer,"2"); + sendAndCommitMessage(producer,"3"); + sendAndCommitMessage(producer,"1"); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + receiveAndValidateMessage(consumer, "1"); + receiveAndValidateMessage(consumer, "2"); + receiveAndValidateMessage(consumer, "3"); + + sendAndCommitMessage(producer,"4"); + + receiveAndValidateMessage(consumer, "4"); + + sendAndCommitMessage(producer,"7"); + sendAndCommitMessage(producer,"6"); + sendAndCommitMessage(producer,"5"); + + // pre-fetch causes "unexpected" order + receiveAndValidateMessage(consumer, "7"); + receiveAndValidateMessage(consumer, "5"); + receiveAndValidateMessage(consumer, "6"); + } + + public void testGetNextPreFetch() throws JMSException, NamingException, AMQException + { + final Queue queue = createQueue(); + final MessageProducer producer = _producerSession.createProducer(queue); + + sendAndCommitMessage(producer,"1"); + + final Session consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + _consumerConnection.start(); + receiveAndValidateMessage(consumer, "1"); + + producer.send(getSortableTestMesssage("4")); + producer.send(getSortableTestMesssage("3")); + producer.send(getSortableTestMesssage("2")); + _producerSession.commit(); + + // pre-fetch causes "unexpected" order + receiveAndValidateMessage(consumer, "4"); + receiveAndValidateMessage(consumer, "2"); + receiveAndValidateMessage(consumer, "3"); + } + + public void testGetNextWithAck() throws JMSException, NamingException, AMQException + { + Queue _queue = createQueue(); + MessageProducer producer = _producerSession.createProducer(_queue); + Message received = null; + + //Send 3 out of order + sendAndCommitMessage(producer,"2"); + sendAndCommitMessage(producer,"3"); + sendAndCommitMessage(producer,"1"); + + final Session consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + //Receive 3 in sorted order + received = receiveAndValidateMessage(consumer, "1"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "2"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "3"); + received.acknowledge(); + + //Send 1 + sendAndCommitMessage(producer,"4"); + + //Receive 1 and recover + received = receiveAndValidateMessage(consumer, "4"); + consumerSession.recover(); + + //Receive same 1 + received = receiveAndValidateMessage(consumer, "4"); + received.acknowledge(); + + //Send 3 out of order + sendAndCommitMessage(producer,"7"); + sendAndCommitMessage(producer,"6"); + sendAndCommitMessage(producer,"5"); + + //Receive 1 of 3 (out of order due to pre-fetch) and recover + received = receiveAndValidateMessage(consumer, "7"); + consumerSession.recover(); + + if (isBroker010()) + { + //Receive 3 in sorted order (not as per JMS recover) + received = receiveAndValidateMessage(consumer, "5"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "6"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "7"); + received.acknowledge(); + } + else + { + //Receive 3 in partial sorted order due to recover + received = receiveAndValidateMessage(consumer, "7"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "5"); + received.acknowledge(); + received = receiveAndValidateMessage(consumer, "6"); + received.acknowledge(); + } + } + + protected Queue createQueue() throws AMQException, JMSException + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(AMQQueueFactory.QPID_QUEUE_SORT_KEY, TEST_SORT_KEY); + ((AMQSession<?,?>) _producerSession).createQueue(new AMQShortString(getTestQueueName()), false, true, false, arguments); + final Queue queue = new AMQQueue("amq.direct", getTestQueueName()); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination) queue); + return queue; + } + + private Message getSortableTestMesssage(final String key) throws JMSException + { + final Message msg = _producerSession.createTextMessage("Message Text: Key Value" + key); + msg.setStringProperty(TEST_SORT_KEY, key); + return msg; + } + + private void sendAndCommitMessage(final MessageProducer producer, final String keyValue) throws JMSException + { + producer.send(getSortableTestMesssage(keyValue)); + _producerSession.commit(); + } + + private Message receiveAndValidateMessage(final MessageConsumer consumer, final String expectedKey) throws JMSException + { + final Message received = (TextMessage) consumer.receive(10000); + assertNotNull("Received message is unexpectedly null", received); + assertEquals("Received message with unexpected sorted key value", expectedKey, + received.getStringProperty(TEST_SORT_KEY)); + return received; + } + + private class TestConsumerThread extends Thread + { + private boolean _stopped = false; + private int _count = 0; + private int _consumed = 0; + private int _sessionType = Session.AUTO_ACKNOWLEDGE; + private Queue _queue; + + public TestConsumerThread(final int sessionType, final Queue queue) + { + _sessionType = sessionType; + _queue = queue; + } + + public void run() + { + try + { + Connection conn = null; + try + { + conn = getConnection(); + } + catch(Exception e) + { + fail("Could not get connection"); + } + + final Session session = conn.createSession((_sessionType == Session.SESSION_TRANSACTED ? true : false), + _sessionType); + final MessageConsumer consumer = session.createConsumer(_queue); + + conn.start(); + + TextMessage msg; + Calendar cal = Calendar.getInstance(Locale.UK); + while((msg = (TextMessage) consumer.receive(1000)) != null) + { + if(_sessionType == Session.SESSION_TRANSACTED) + { + if (_count%10 == 0) + { + LOGGER.debug("transacted session rollback"); + session.rollback(); + } + else + { + LOGGER.debug("transacted session commit"); + session.commit(); + _consumed++; + } + } + else if(_sessionType == Session.CLIENT_ACKNOWLEDGE) + { + if (_count%10 == 0) + { + LOGGER.debug("client ack session recover"); + session.recover(); + } + else + { + LOGGER.debug("client ack session acknowledge"); + msg.acknowledge(); + _consumed++; + } + } + else + { + LOGGER.debug("auto ack session"); + _consumed++; + } + + _count++; + LOGGER.debug("Message consumed at : " + cal.getTimeInMillis()); + LOGGER.debug("Message consumed with key: " + msg.getStringProperty(TEST_SORT_KEY)); + LOGGER.debug("Message consumed with text: " + msg.getText()); + LOGGER.debug("Message consumed with consumed index: " + _consumed); + } + + _stopped = true; + session.close(); + conn.close(); + } + catch(JMSException e) + { + e.printStackTrace(); + } + } + + public synchronized boolean isStopped() + { + return _stopped; + } + + public synchronized int getConsumed() + { + return _consumed; + } + } + + private static class AscendingSortedKeys + { + public static final String[] KEYS = { "Ul4a1", "WaWsv", "2Yz7E", "ix74r", "okgRi", "HlUbF", "LewvM", "lweGy", + "TXQ0Z", "0Kyfs", "s7Mxk", "dmoS7", "8RCUA", "W3VFH", "aez9y", "uQIcz", "0h1b1", "cmXIX", + "4dEz6", "zHF1q", "D6rBy", "5drc6", "0BmCy", "BCxeC", "t59lR", "aL6AJ", "OHaBz", "WmadA", + "B3qem", "CxVEf", "AIYUu", "uJScX", "uoStw", "ogLgc", "AgJHQ", "hUTw7", "Rxrsm", "9GXkX", + "7hyVv", "y94nw", "Twano", "TCgPp", "pFrrl", "POUYS", "L7cGc", "0ao3l", "CNHmv", "MaJQs", + "OUqFM", "jeskS", "FPfSE", "v1Hln", "14FLR", "KZamH", "G1RhS", "FVMxo", "rKDLJ", "nnP8o", + "nFqik", "zmLYD", "1j5L8", "e6e4z", "WDVWJ", "aDGtS", "fcwDa", "nlaBy", "JJs5m", "vLsmS", + "No0Qb", "JXljW", "Waim6", "MezSW", "l83Ud", "SjskQ", "uPX7G", "5nmWv", "ZhwG1", "uTacx", + "t98iW", "JkzUn", "fmIK1", "i7WMQ", "bgJAz", "n1pmO", "jS1aj", "4W0Tl", "Yf2Ec", "sqVrf", + "MojnP", "qQxHP", "pWiOs", "yToGW", "kB5nP", "BpYhV", "Cfgr3", "zbIYY", "VLTy6", "he9IA", + "lm0pD", "WreyP", "8hJdt", "QnJ1S", "n8pJ9", "iqv4k", "OUYuF", "8cVD3", "sx5Gl", "cQOnv", + "wiHrZ", "oGu6x", "7fsYM", "gf8rI", "7fKYU", "pT8wu", "lCMxy", "prNT6", "5Drn0", "guMb8", + "OxWIH", "uZPqg", "SbRYy", "In3NS", "uvf7A", "FLsph", "pmeCd", "BbwgA", "ru4UG", "YOfrY", + "W7cTs", "K4GS8", "AOgEe", "618Di", "dpe1v", "3otm6", "oVQp6", "5Mg9r", "Y1mC0", "VIlwP", + "aFFss", "Mkgy8", "pv0i7", "S77LH", "XyPZN", "QYxC0", "vkCHH", "MGlTF", "24ARF", "v2eC3", + "ZUnqt", "HfyNQ", "FjHXR", "45cIH", "1LB1L", "zqH0W", "fLNg8", "oQ87r", "Cp3mZ", "Zv7z0", + "O3iyQ", "EOE1o", "5ZaEz", "tlILt", "MmsIo", "lXFOB", "gtCA5", "yEfy9", "7X3uy", "d7vjM", + "XflUq", "Fhtgl", "NOHsz", "GWqqX", "xciqp", "BFkb8", "P6bcg", "lViBv", "2TRI7", "2hEEU", + "9XyT9", "29QAz", "U3yw5", "FxX9q", "C2Irc", "8U2nU", "m4bxU", "5iGN5", "mX2GE", "cShY2", + "JRJQB", "yvOMI", "4QMc9", "NAFuw", "RmDcr", "faHir", "2ZHdk", "zY1GY", "a00b5", "ZuDtD", + "JIqXi", "K20wK", "gdQsS", "5Namm", "lkMUA", "IBe8k", "FcWrW", "FFDui", "tuDyS", "ZJTXH", + "AkKTk", "zQt6Q", "FNYIM", "RpBQm", "RsQUq", "Mm8si", "gjUTu", "zz4ZU", "jiVBP", "ReKEW", + "5VZjS", "YjB9t", "zFgtB", "8TxD7", "euZA5", "MK07Y", "CK5W7", "16lHc", "6q6L9", "Z4I1v", + "UlU3M", "SWfou", "0PktI", "55rfB", "jfREu", "580YD", "Uvlv4", "KASQ8", "AmdQd", "piJSk", + "hE1Ql", "LDk6f", "NcICA", "IKxdL", "iwzGk", "uN6r3", "lsQGo", "QClRL", "iKqhr", "FGzgp", + "RkQke", "b29RJ", "CIShG", "9eoRc", "F6PT2", "LbRTH", "M3zXL", "GXdoH", "IjTwP", "RBhp0", + "yluBx", "mz8gx", "MmKGJ", "Q6Lix", "uupzk", "RACuj", "d85a9", "qaofN", "kZANm", "jtn0X", + "lpF6W", "suY4x", "rz7Ut", "wDajX", "1v5hH", "Yw2oU", "ksJby", "WMiS3", "lj07Q", "EdBKc", + "6AFT0", "0YAGH", "ThjNn", "JKWYR", "9iGoT", "UmaEv", "3weIF", "CdyBV", "pAhR1", "djsrv", + "xReec", "8FmFH", "Dz1R3", "Ta8f6", "DG4sT", "VjCZq", "jSjS3", "Pb1pa", "VNCPd", "Kr8ys", + "AXpwE", "ZzJHW", "Nxx9V", "jzUqR", "dhSuH", "DQimp", "07n1c", "HP433", "RzaZA", "cL0aE", + "Ss0Zu", "FnPFB", "7lUXZ", "9rlg9", "lH1kt", "ni2v1", "48cHL", "soy9t", "WPmlx", "2Nslm", + "hSSvQ", "9y4lw", "ulk41", "ECMvU", "DLhzM", "GrDg7", "x3LDe", "QChxs", "xXTI4", "Gv3Fq", + "rhl0J", "QssNC", "brhlQ", "s93Ml", "tl72W", "pvgjS", "Qworu", "DcpWB", "X6Pin", "J2mQi", + "BGaQY", "CqqaD", "NhXdu", "dQ586", "Yh1hF", "HRxd8", "PYBf4", "64s8N", "tvdkD", "azIWp", + "tAOsr", "v8yFN", "h1zcH", "SmGzv", "bZLvS", "fFDrJ", "Oz8yZ", "0Wr5y", "fcJOy", "7ku1p", + "QbxXc", "VerEA", "QWxoT", "hYBCK", "o8Uyd", "FwEJz", "hi5X7", "uAWyp", "I7p2a", "M6qcG", + "gIYvE", "HzZT8", "iB08l", "StlDJ", "tjQxs", "k85Ae", "taOXK", "s4786", "2DREs", "atef2", + "Vprf2", "VBjhz", "EoToP", "blLA9", "qUJMd", "ydG8U", "8xEKz", "uLtKs", "GSQwj", "S2Dfu", + "ciuWz", "i3pyd", "7Ow5C", "IRh48", "vOqCE", "Q6hMC", "yofH3", "KsjRK", "5IhmG", "fqypy", + "0MR5X", "Chuy3" }; + + private static int _i = 0; + private static int _j = 0; + + static + { + Arrays.sort(KEYS); + } + + public static String getNextKey() + { + if(_j == KEYS.length) + { + _j = 0; + _i++; + if(_i == KEYS.length) + { + _i = 0; + } + } + return new StringBuffer().append(KEYS[_i]).append("-").append(KEYS[_j++]).toString(); + } + } +} diff --git a/java/test-profiles/JavaPre010Excludes b/java/test-profiles/JavaPre010Excludes index 68feaf1e2b..057d7c2c44 100644 --- a/java/test-profiles/JavaPre010Excludes +++ b/java/test-profiles/JavaPre010Excludes @@ -33,6 +33,7 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties +org.apache.qpid.server.queue.AddressBasedSortedQueueTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* |
