diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-08 17:52:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-08 17:52:05 +0000 |
commit | db96490d3b7b9ea8643b4f9ce21efdbaaa221b39 (patch) | |
tree | 14e879ee092551d4315301bba2d582e7817dcb3d | |
parent | 977381169b74290411ae3c01f829262cf4c59dba (diff) | |
download | qpid-python-db96490d3b7b9ea8643b4f9ce21efdbaaa221b39.tar.gz |
Modify queue classes to use generics
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566069 13f79535-47bb-0310-9956-ffa450edef68
67 files changed, 1878 insertions, 1333 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index bc670bd848..600c60bdb3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -428,10 +429,10 @@ public abstract class AbstractExchange implements Exchange return queues; } - public final int send(final ServerMessage message, + public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) { List<? extends BaseQueue> queues = route(message, instanceProperties); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 33c5218b4c..78b9664cd3 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -335,10 +336,10 @@ public class DefaultExchange implements Exchange return _id; } - public final int send(final ServerMessage message, + public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) { final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 967c629749..62f5b3634b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.message; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -37,8 +38,8 @@ public interface MessageDestination extends MessageNode * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ - int send(ServerMessage message, + <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message, InstanceProperties instanceProperties, ServerTransaction txn, - Action<MessageInstance<? extends Consumer>> postEnqueueAction); + Action<? super MessageInstance<?,? extends Consumer>> postEnqueueAction); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 97cb66cce4..49969ce3c1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; -public interface MessageInstance<C extends Consumer> +public interface MessageInstance<M extends MessageInstance<M,C>, C extends Consumer> { @@ -45,9 +45,9 @@ public interface MessageInstance<C extends Consumer> void decrementDeliveryCount(); - void addStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener); + void addStateChangeListener(StateChangeListener<? super M,State> listener); - boolean removeStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener); + boolean removeStateChangeListener(StateChangeListener<? super M, State> listener); boolean acquiredByConsumer(); @@ -71,7 +71,7 @@ public interface MessageInstance<C extends Consumer> int getMaximumDeliveryCount(); - int routeToAlternate(Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn); + int routeToAlternate(Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn); Filterable asFilterable(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index 06ff76f103..49b0f2995c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -32,17 +32,17 @@ import org.apache.qpid.server.store.TransactionLogResource; import java.util.Collection; import java.util.EnumSet; -public interface MessageSource<C extends Consumer> extends TransactionLogResource, MessageNode +public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> extends TransactionLogResource, MessageNode { - C addConsumer(ConsumerTarget target, FilterManager filters, + <T extends ConsumerTarget> C addConsumer(T target, FilterManager filters, Class<? extends ServerMessage> messageClass, String consumerName, EnumSet<Consumer.Option> options) throws AMQException; Collection<C> getConsumers(); - void addConsumerRegistrationListener(ConsumerRegistrationListener listener); + void addConsumerRegistrationListener(ConsumerRegistrationListener<S> listener); - void removeConsumerRegistrationListener(ConsumerRegistrationListener listener); + void removeConsumerRegistrationListener(ConsumerRegistrationListener<S> listener); AuthorizationHolder getAuthorizationHolder(); @@ -54,10 +54,10 @@ public interface MessageSource<C extends Consumer> extends TransactionLogResourc boolean isExclusive(); - interface ConsumerRegistrationListener + interface ConsumerRegistrationListener<Q extends MessageSource<? extends Consumer,Q>> { - void consumerAdded(AMQQueue queue, Consumer consumer); - void consumerRemoved(AMQQueue queue, Consumer consumer); + void consumerAdded(Q source, Consumer consumer); + void consumerRemoved(Q queue, Consumer consumer); } /** diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index d59b13902b..f3d1c8a665 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -33,6 +33,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFinder; import org.apache.qpid.server.model.Exchange; @@ -49,8 +50,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.MapValueConverter; -final class QueueAdapter extends AbstractAdapter implements Queue, - AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener +final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter implements Queue, + MessageSource.ConsumerRegistrationListener<Q>, + AMQQueue.NotificationListener { @SuppressWarnings("serial") static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ @@ -66,10 +68,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue, put(DESCRIPTION, String.class); }}); - private final AMQQueue _queue; + private final AMQQueue<?,Q,?> _queue; + private final Map<Binding, BindingAdapter> _bindingAdapters = new HashMap<Binding, BindingAdapter>(); - private Map<Consumer, ConsumerAdapter> _consumerAdapters = + private final Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>(); @@ -77,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, private QueueStatisticsAdapter _statistics; private QueueNotificationListener _queueNotificationListener; - public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue) + public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue<?,Q,?> queue) { super(queue.getId(), virtualHostAdapter.getTaskExecutor()); _vhost = virtualHostAdapter; @@ -124,11 +127,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, private void populateConsumers() { - Collection<Consumer> actualConsumers = _queue.getConsumers(); + Collection<? extends Consumer> actualConsumers = _queue.getConsumers(); synchronized (_consumerAdapters) { - Iterator<Consumer> iter = _consumerAdapters.keySet().iterator(); for(Consumer consumer : actualConsumers) { if(!_consumerAdapters.containsKey(consumer)) @@ -396,9 +398,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, } else if(LVQ_KEY.equals(name)) { - if(_queue instanceof ConflationQueue) + AMQQueue queue = _queue; + if(queue instanceof ConflationQueue) { - return ((ConflationQueue)_queue).getConflationKey(); + return ((ConflationQueue)queue).getConflationKey(); } } else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) @@ -427,22 +430,24 @@ final class QueueAdapter extends AbstractAdapter implements Queue, } else if(SORT_KEY.equals(name)) { - if(_queue instanceof SortedQueue) + AMQQueue queue = _queue; + if(queue instanceof SortedQueue) { - return ((SortedQueue)_queue).getSortedPropertyName(); + return ((SortedQueue)queue).getSortedPropertyName(); } } else if(TYPE.equals(name)) { - if(_queue instanceof SortedQueue) + AMQQueue queue = _queue; + if(queue instanceof SortedQueue) { return "sorted"; } - if(_queue instanceof ConflationQueue) + if(queue instanceof ConflationQueue) { return "lvq"; } - if(_queue instanceof AMQPriorityQueue) + if(queue instanceof PriorityQueue) { return "priority"; } @@ -486,9 +491,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue, } else if(PRIORITIES.equals(name)) { - if(_queue instanceof AMQPriorityQueue) + AMQQueue queue = _queue; + if(queue instanceof PriorityQueue) { - return ((AMQPriorityQueue)_queue).getPriorities(); + return ((PriorityQueue)queue).getPriorities(); } } return super.getAttribute(name); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 75994f6d81..28fadd4162 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -27,12 +27,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; /** * Session model interface. * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet} - * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}. + * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}. */ public interface AMQSessionModel extends Comparable<AMQSessionModel> { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 62927edc29..76477a0a9b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -37,8 +37,8 @@ import java.util.Collection; import java.util.List; import java.util.Set; -public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue, - MessageSource<C>, CapacityChecker, MessageDestination +public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> + extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination { public interface NotificationListener @@ -87,41 +87,35 @@ public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, E int getMessageCount(); - int getUndeliveredMessageCount(); - long getQueueDepth(); - long getReceivedMessageCount(); - long getOldestMessageArrivalTime(); boolean isDeleted(); int delete() throws AMQException; - void requeue(QueueEntry entry); + void requeue(E entry); - void dequeue(QueueEntry entry, Consumer sub); + void dequeue(E entry); - void decrementUnackedMsgCount(QueueEntry queueEntry); + void decrementUnackedMsgCount(E queueEntry); - boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException; + boolean resend(final E entry, final C consumer) throws AMQException; void addQueueDeleteTask(Action<AMQQueue> task); void removeQueueDeleteTask(Action<AMQQueue> task); - List<QueueEntry> getMessagesOnTheQueue(); - - List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); + List<E> getMessagesOnTheQueue(); List<Long> getMessagesOnTheQueue(int num); List<Long> getMessagesOnTheQueue(int num, int offset); - QueueEntry getMessageOnTheQueue(long messageId); + E getMessageOnTheQueue(long messageId); /** * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. @@ -132,9 +126,9 @@ public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, E * @param toPosition * @return */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition); + public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition); - void visit(QueueEntryVisitor visitor); + void visit(QueueEntryVisitor<E> visitor); long getMaximumMessageSize(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 38c205bc00..4e0a9048e1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -288,11 +288,11 @@ public class AMQQueueFactory implements QueueFactory } else if(priorities > 1) { - q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); + q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); } else { - q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); + q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); } q.setDeleteOnNoConsumers(deleteOnNoConsumer); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index a9b36c1b24..efc82c0ab4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -28,7 +28,7 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; -public class AssignedConsumerMessageGroupManager implements MessageGroupManager +public class AssignedConsumerMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L> { private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class); @@ -53,25 +53,18 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager return val; } - public QueueConsumer getAssignedConsumer(final QueueEntry entry) + public QueueConsumer getAssignedConsumer(final E entry) { Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); } - public boolean acceptMessage(QueueConsumer sub, QueueEntry entry) + public boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry) { - if(assignMessage(sub, entry)) - { - return entry.acquire(sub); - } - else - { - return false; - } + return assignMessage(sub, entry) && entry.acquire(sub); } - private boolean assignMessage(QueueConsumer sub, QueueEntry entry) + private boolean assignMessage(QueueConsumer sub, E entry) { Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); if(groupVal == null) @@ -105,16 +98,16 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager } } - public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub) + public E findEarliestAssignedAvailableEntry(QueueConsumer sub) { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); return visitor.getEntry(); } - private class EntryFinder implements QueueEntryVisitor + private class EntryFinder implements QueueEntryVisitor<E> { - private QueueEntry _entry; + private E _entry; private QueueConsumer _sub; public EntryFinder(final QueueConsumer sub) @@ -122,7 +115,7 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager _sub = sub; } - public boolean visit(final QueueEntry entry) + public boolean visit(final E entry) { if(!entry.isAvailable()) { @@ -148,7 +141,7 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager } } - public QueueEntry getEntry() + public E getEntry() { return _entry; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index c1c3bd37e6..31c54dcdd2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -28,9 +28,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; -public interface BaseQueue extends TransactionLogResource +public interface BaseQueue<C extends Consumer> extends TransactionLogResource { - void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException; + void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action) throws AMQException; boolean isDurable(); boolean isDeleted(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index c2813bb7a5..a1ff51959c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -26,7 +26,7 @@ import java.util.UUID; import org.apache.qpid.server.virtualhost.VirtualHost; -public class ConflationQueue extends SimpleAMQQueue +public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { protected ConflationQueue(UUID id, String name, @@ -42,7 +42,7 @@ public class ConflationQueue extends SimpleAMQQueue public String getConflationKey() { - return ((ConflationQueueList) getEntries()).getConflationKey(); + return getEntries().getConflationKey(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 7469e95394..a98a4ac144 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -32,23 +32,38 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -public class ConflationQueueList extends SimpleQueueEntryList +public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class); + private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>() + { + + @Override + public ConflationQueueEntry createHead(final ConflationQueueList list) + { + return list.createHead(); + } + }; + private final String _conflationKey; - private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = - new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); + private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap = + new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>(); - private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); - private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); + private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); - public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey) + public ConflationQueueList(ConflationQueue queue, String conflationKey) { - super(queue); + super(queue, HEAD_CREATOR); _conflationKey = conflationKey; } + private ConflationQueueEntry createHead() + { + return new ConflationQueueEntry(this); + } + public String getConflationKey() { return _conflationKey; @@ -66,7 +81,7 @@ public class ConflationQueueList extends SimpleQueueEntryList @Override public ConflationQueueEntry add(final ServerMessage message) { - final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); + final ConflationQueueEntry addedEntry = super.add(message); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) @@ -76,14 +91,14 @@ public class ConflationQueueList extends SimpleQueueEntryList LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue); } - final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry); - AtomicReference<QueueEntry> entryReferenceFromMap = null; - QueueEntry entryFromMap; + final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry); + AtomicReference<ConflationQueueEntry> entryReferenceFromMap; + ConflationQueueEntry entryFromMap; // Iterate until we have got a valid atomic reference object and either the referent is newer than the current // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value // indicating that the reference object is no longer valid (it is being removed from the map). - boolean keepTryingToUpdateEntryReference = true; + boolean keepTryingToUpdateEntryReference; do { do @@ -139,16 +154,16 @@ public class ConflationQueueList extends SimpleQueueEntryList * adds and removes during execution of this method.</li> * </ul> */ - private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue) + private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue) { - AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); if(latestValueReference == null) { latestValueReference = _latestValuesMap.get(key); if(latestValueReference == null) { - return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone); + return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone); } } return latestValueReference; @@ -177,12 +192,17 @@ public class ConflationQueueList extends SimpleQueueEntryList } } - private final class ConflationQueueEntry extends SimpleQueueEntryImpl + final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList> { - private AtomicReference<QueueEntry> _latestValueReference; + private AtomicReference<ConflationQueueEntry> _latestValueReference; + + private ConflationQueueEntry(final ConflationQueueList queueEntryList) + { + super(queueEntryList); + } - public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message) + public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message) { super(queueEntryList, message); } @@ -206,7 +226,7 @@ public class ConflationQueueList extends SimpleQueueEntryList } - public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) + public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference) { _latestValueReference = latestValueReference; } @@ -227,12 +247,12 @@ public class ConflationQueueList extends SimpleQueueEntryList /** * Exposed purposes of unit test only. */ - Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap() + Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap() { return Collections.unmodifiableMap(_latestValuesMap); } - static class Factory implements QueueEntryListFactory + static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> { private final String _conflationKey; @@ -241,7 +261,8 @@ public class ConflationQueueList extends SimpleQueueEntryList _conflationKey = conflationKey; } - public ConflationQueueList createQueueEntryList(AMQQueue queue) + @Override + public ConflationQueueList createQueueEntryList(final ConflationQueue queue) { return new ConflationQueueList(queue, _conflationKey); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index 4c74e5ba0b..e67591ae07 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -32,22 +32,22 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.HashMap; import java.util.Map; -public class DefinedGroupMessageGroupManager implements MessageGroupManager +public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L> { private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class); private final String _groupId; private final String _defaultGroup; private final Map<Object, Group> _groupMap = new HashMap<Object, Group>(); - private final ConsumerResetHelper _resetHelper; + private final ConsumerResetHelper<E,Q,L> _resetHelper; private final class Group { private final Object _group; - private QueueConsumer _consumer; + private QueueConsumer<?,E,Q,L> _consumer; private int _activeCount; - private Group(final Object key, final QueueConsumer consumer) + private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer) { _group = key; _consumer = consumer; @@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed())); } - public QueueConsumer getConsumer() + public QueueConsumer<?,E,Q,L> getConsumer() { return _consumer; } @@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper) + public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper) { _groupId = groupId; _defaultGroup = defaultGroup; _resetHelper = resetHelper; } - public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry) + public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry) { Object groupId = getKey(entry); @@ -135,19 +135,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return group == null || !group.isValid() ? null : group.getConsumer(); } - public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry) + public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry) { - if(assignMessage(sub, entry)) - { - return entry.acquire(sub); - } - else - { - return false; - } + return assignMessage(sub, entry) && entry.acquire(sub); } - private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry) + private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry) { Object groupId = getKey(entry); Group group = _groupMap.get(groupId); @@ -171,7 +164,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager if(assignedSub == sub) { - entry.addStateChangeListener(new GroupStateChangeListener(group, entry)); + entry.addStateChangeListener(new GroupStateChangeListener(group)); return true; } else @@ -180,16 +173,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub) + public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub) { EntryFinder visitor = new EntryFinder(sub); sub.getQueue().visit(visitor); return visitor.getEntry(); } - private class EntryFinder implements QueueEntryVisitor + private class EntryFinder implements QueueEntryVisitor<E> { - private QueueEntry _entry; + private E _entry; private QueueConsumer _sub; public EntryFinder(final QueueConsumer sub) @@ -197,7 +190,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager _sub = sub; } - public boolean visit(final QueueEntry entry) + public boolean visit(final E entry) { if(!entry.isAvailable()) { @@ -218,7 +211,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - public QueueEntry getEntry() + public E getEntry() { return _entry; } @@ -229,7 +222,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { } - private Object getKey(QueueEntry entry) + private Object getKey(E entry) { ServerMessage message = entry.getMessage(); AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); @@ -241,17 +234,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return groupVal; } - private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State> + private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State> { private final Group _group; - public GroupStateChangeListener(final Group group, - final MessageInstance<QueueConsumer> entry) + public GroupStateChangeListener(final Group group) { _group = group; } - public void stateChanged(final MessageInstance<QueueConsumer> entry, + public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry, final MessageInstance.State oldState, final MessageInstance.State newState) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java index 740a96bf2d..ba9dbc8a70 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.queue; -public interface MessageGroupManager +public interface MessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - public interface ConsumerResetHelper + public interface ConsumerResetHelper<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments); + public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments); - boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub); + boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub); } - QueueConsumer getAssignedConsumer(QueueEntry entry); + QueueConsumer getAssignedConsumer(E entry); - boolean acceptMessage(QueueConsumer sub, QueueEntry entry); + boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry); - QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub); + E findEarliestAssignedAvailableEntry(QueueConsumer<?,E,Q,L> sub); void clearAssignments(QueueConsumer sub); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java index 251a1f55ed..369f42b183 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java @@ -24,46 +24,46 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public class SimpleQueueEntryImpl extends QueueEntryImpl +public abstract class OrderedQueueEntry<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> extends QueueEntryImpl<E,Q,L> { - static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl> + static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry> _nextUpdater = AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next"); + (OrderedQueueEntry.class, OrderedQueueEntry.class, "_next"); - private volatile SimpleQueueEntryImpl _next; + private volatile E _next; - public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList) + public OrderedQueueEntry(L queueEntryList) { super(queueEntryList); } - public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) + public OrderedQueueEntry(L queueEntryList, ServerMessage message, final long entryId) { super(queueEntryList, message, entryId); } - public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) + public OrderedQueueEntry(L queueEntryList, ServerMessage message) { super(queueEntryList, message); } - public SimpleQueueEntryImpl getNextNode() + public E getNextNode() { return _next; } - public SimpleQueueEntryImpl getNextValidEntry() + public E getNextValidEntry() { - SimpleQueueEntryImpl next = getNextNode(); + E next = getNextNode(); while(next != null && next.isDeleted()) { - final SimpleQueueEntryImpl newNext = next.getNextNode(); + final E newNext = next.getNextNode(); if(newNext != null) { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); next = getNextNode(); } else diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java new file mode 100644 index 0000000000..f2491bdb0c --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -0,0 +1,198 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +public abstract class OrderedQueueEntryList<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements SimpleQueueEntryList<E,Q,L> +{ + + private final E _head; + + private volatile E _tail; + + static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry> + _tailUpdater = + AtomicReferenceFieldUpdater.newUpdater + (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail"); + + + private final Q _queue; + + static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry> + _nextUpdater = OrderedQueueEntry._nextUpdater; + + private AtomicLong _scavenges = new AtomicLong(0L); + private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); + private final AtomicReference<E> _unscavengedHWM = new AtomicReference<E>(); + + + public OrderedQueueEntryList(Q queue, HeadCreator<E,Q,L> headCreator) + { + _queue = queue; + _head = headCreator.createHead((L)this); + _tail = _head; + } + + void scavenge() + { + E hwm = _unscavengedHWM.getAndSet(null); + E next = _head.getNextValidEntry(); + + if(hwm != null) + { + while (next != null && hwm.compareTo(next)>0) + { + next = next.getNextValidEntry(); + } + } + } + + + public Q getQueue() + { + return _queue; + } + + + public E add(ServerMessage message) + { + E node = createQueueEntry(message); + for (;;) + { + OrderedQueueEntry tail = _tail; + OrderedQueueEntry next = tail.getNextNode(); + if (tail == _tail) + { + if (next == null) + { + node.setEntryId(tail.getEntryId()+1); + if (_nextUpdater.compareAndSet(tail, null, node)) + { + _tailUpdater.compareAndSet(this, tail, node); + + return node; + } + } + else + { + _tailUpdater.compareAndSet(this,tail, next); + } + } + } + } + + abstract protected E createQueueEntry(ServerMessage<?> message); + + public E next(E node) + { + return node.getNextValidEntry(); + } + + public static interface HeadCreator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> + { + E createHead(L list); + } + + public static class QueueEntryIteratorImpl<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> + { + private E _lastNode; + + QueueEntryIteratorImpl(E startNode) + { + _lastNode = startNode; + } + + public boolean atTail() + { + return _lastNode.getNextValidEntry() == null; + } + + public E getNode() + { + return _lastNode; + } + + public boolean advance() + { + E nextValidNode = _lastNode.getNextValidEntry(); + + if(nextValidNode != null) + { + _lastNode = nextValidNode; + } + + return nextValidNode != null; + } + } + + public QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> iterator() + { + return new QueueEntryIteratorImpl<E,Q,L>(_head); + } + + + public E getHead() + { + return _head; + } + + public void entryDeleted(E queueEntry) + { + E next = _head.getNextNode(); + E newNext = _head.getNextValidEntry(); + + // the head of the queue has not been deleted, hence the deletion must have been mid queue. + if (next == newNext) + { + E unscavengedHWM = _unscavengedHWM.get(); + while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0) + { + _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry); + unscavengedHWM = _unscavengedHWM.get(); + } + if (_scavenges.incrementAndGet() > _scavengeCount) + { + _scavenges.set(0L); + scavenge(); + } + } + else + { + E unscavengedHWM = _unscavengedHWM.get(); + if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0)) + { + _unscavengedHWM.compareAndSet(unscavengedHWM, null); + } + } + } + + public int getPriorities() + { + return 0; + } + + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 6918ae683c..7cf245c8f8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -25,30 +25,30 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; -public abstract class OutOfOrderQueue extends SimpleAMQQueue +public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L> { protected OutOfOrderQueue(UUID id, String name, boolean durable, String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments) { super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); } @Override - protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final E entry) { // check that all consumers are not in advance of the entry - QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator(); while(subIter.advance() && !entry.isAcquired()) { - final QueueConsumer consumer = subIter.getNode().getConsumer(); + final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer(); if(!consumer.isClosed()) { - QueueContext context = consumer.getQueueContext(); + QueueContext<E,Q,L> context = consumer.getQueueContext(); if(context != null) { - QueueEntry released = context.getReleasedEntry(); + E released = context.getReleasedEntry(); while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0)) { if(QueueContext._releasedUpdater.compareAndSet(context,released,entry)) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 46c2a635b7..4440d045d1 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -25,22 +25,22 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; -public class AMQPriorityQueue extends OutOfOrderQueue +public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList> { - protected AMQPriorityQueue(UUID id, - final String name, - final boolean durable, - final String owner, - final boolean autoDelete, - boolean exclusive, - final VirtualHost virtualHost, - Map<String, Object> arguments, int priorities) + protected PriorityQueue(UUID id, + final String name, + final boolean durable, + final String owner, + final boolean autoDelete, + boolean exclusive, + final VirtualHost virtualHost, + Map<String, Object> arguments, int priorities) { super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); } public int getPriorities() { - return ((PriorityQueueList) getEntries()).getPriorities(); + return getEntries().getPriorities(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 05d84327d4..e877983643 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -22,132 +22,162 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; -public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> +abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList> { - private final AMQQueue<QueueConsumer> _queue; - private final PriorityQueueEntrySubList[] _priorityLists; - private final int _priorities; - private final int _priorityOffset; - public PriorityQueueList(AMQQueue queue, int priorities) - { - _queue = queue; - _priorityLists = new PriorityQueueEntrySubList[priorities]; - _priorities = priorities; - _priorityOffset = 5-((priorities + 1)/2); - for(int i = 0; i < priorities; i++) - { - _priorityLists[i] = new PriorityQueueEntrySubList(queue, i); - } - } - public int getPriorities() + public PriorityQueueList(final PriorityQueue queue, + final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator) { - return _priorities; + super(queue, headCreator); } - public AMQQueue<QueueConsumer> getQueue() + static class PriorityQueueMasterList extends PriorityQueueList { - return _queue; - } + private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR = + new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>() + { + @Override + public PriorityQueueEntry createHead(final PriorityQueueList list) + { + return null; + } + }; + private final PriorityQueue _queue; + private final PriorityQueueEntrySubList[] _priorityLists; + private final int _priorities; + private final int _priorityOffset; - public SimpleQueueEntryImpl add(ServerMessage message) - { - int index = message.getMessageHeader().getPriority() - _priorityOffset; - if(index >= _priorities) + public PriorityQueueMasterList(PriorityQueue queue, int priorities) { - index = _priorities-1; + super(queue, DUMMY_HEAD_CREATOR); + _queue = queue; + _priorityLists = new PriorityQueueEntrySubList[priorities]; + _priorities = priorities; + _priorityOffset = 5-((priorities + 1)/2); + for(int i = 0; i < priorities; i++) + { + _priorityLists[i] = new PriorityQueueEntrySubList(queue, i); + } } - else if(index < 0) + + public int getPriorities() { - index = 0; + return _priorities; } - return _priorityLists[index].add(message); - - } - - public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) - { - SimpleQueueEntryImpl next = node.getNextValidEntry(); - if(next == null) + public PriorityQueue getQueue() { - final QueueEntryList<?> nodeEntryList = node.getQueueEntryList(); - int index; - for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {}; + return _queue; + } - while(next == null && index != 0) + public PriorityQueueEntry add(ServerMessage message) + { + int index = message.getMessageHeader().getPriority() - _priorityOffset; + if(index >= _priorities) + { + index = _priorities-1; + } + else if(index < 0) { - index--; - next = _priorityLists[index].getHead().getNextValidEntry(); + index = 0; } + return _priorityLists[index].add(message); } - return next; - } - private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl> - { - private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ]; - private SimpleQueueEntryImpl _lastNode; + @Override + protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message) + { + throw new UnsupportedOperationException(); + } - PriorityQueueEntryListIterator() + public PriorityQueueEntry next(PriorityQueueEntry node) { - for(int i = 0; i < _priorityLists.length; i++) + PriorityQueueEntry next = node.getNextValidEntry(); + + if(next == null) { - _iterators[i] = _priorityLists[i].iterator(); + final PriorityQueueList nodeEntryList = node.getQueueEntryList(); + int index; + for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) + { + // do nothing loop is just to find the index + } + + while(next == null && index != 0) + { + index--; + next = _priorityLists[index].getHead().getNextValidEntry(); + } + } - _lastNode = _iterators[_iterators.length - 1].getNode(); + return next; } - - public boolean atTail() + private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>> { - for(int i = 0; i < _iterators.length; i++) + private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; + private PriorityQueueEntry _lastNode; + + PriorityQueueEntryListIterator() { - if(!_iterators[i].atTail()) + for(int i = 0; i < _priorityLists.length; i++) { - return false; + _iterators[i] = _priorityLists[i].iterator(); } + _lastNode = _iterators[_iterators.length - 1].getNode(); } - return true; - } - public SimpleQueueEntryImpl getNode() - { - return _lastNode; - } - public boolean advance() - { - for(int i = _iterators.length-1; i >= 0; i--) + public boolean atTail() { - if(_iterators[i].advance()) + for(int i = 0; i < _iterators.length; i++) { - _lastNode = _iterators[i].getNode(); - return true; + if(!_iterators[i].atTail()) + { + return false; + } } + return true; + } + + public PriorityQueueEntry getNode() + { + return _lastNode; + } + + public boolean advance() + { + for(int i = _iterators.length-1; i >= 0; i--) + { + if(_iterators[i].advance()) + { + _lastNode = _iterators[i].getNode(); + return true; + } + } + return false; } - return false; } - } - public PriorityQueueEntryListIterator iterator() - { - return new PriorityQueueEntryListIterator(); - } + public PriorityQueueEntryListIterator iterator() + { - public SimpleQueueEntryImpl getHead() - { - return _priorityLists[_priorities-1].getHead(); - } + return new PriorityQueueEntryListIterator(); + } - public void entryDeleted(final SimpleQueueEntryImpl queueEntry) - { + public PriorityQueueEntry getHead() + { + return _priorityLists[_priorities-1].getHead(); + } - } + public void entryDeleted(final PriorityQueueEntry queueEntry) + { - static class Factory implements QueueEntryListFactory + } + } + static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList> { private final int _priorities; @@ -156,26 +186,34 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> _priorities = priorities; } - public PriorityQueueList createQueueEntryList(AMQQueue queue) + public PriorityQueueList createQueueEntryList(PriorityQueue queue) { - return new PriorityQueueList(queue, _priorities); + return new PriorityQueueMasterList(queue, _priorities); } } - private static class PriorityQueueEntrySubList extends SimpleQueueEntryList + static class PriorityQueueEntrySubList extends PriorityQueueList { + private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>() + { + @Override + public PriorityQueueEntry createHead(final PriorityQueueList list) + { + return new PriorityQueueEntry(list); + } + }; private int _listPriority; - public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority) + public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority) { - super(queue); + super(queue, HEAD_CREATOR); _listPriority = listPriority; } @Override - protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message) + protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message) { - return new PriorityQueueEntryImpl(this, message); + return new PriorityQueueEntry(this, message); } public int getListPriority() @@ -184,17 +222,22 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl> } } - private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl + static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList> { - public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message) + private PriorityQueueEntry(final PriorityQueueList queueEntryList) + { + super(queueEntryList); + } + + public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message) { super(queueEntryList, message); } @Override - public int compareTo(final QueueEntry o) + public int compareTo(final PriorityQueueEntry o) { - PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList(); + PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList(); int otherPriority = pqel.getListPriority(); int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index ff7840255a..5c891797f5 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -47,7 +47,7 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -class QueueConsumer<T extends ConsumerTarget> implements Consumer +class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer { public static enum State @@ -61,10 +61,9 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); private final long _id; - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); private final Lock _stateChangeLock = new ReentrantLock(); private final long _createTime = System.currentTimeMillis(); - private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this); + private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this); private final boolean _acquires; private final boolean _seesRequeues; private final String _consumerName; @@ -74,8 +73,10 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer private final FilterManager _filters; private final Class<? extends ServerMessage> _messageClass; private final Object _sessionReference; - private SimpleAMQQueue _queue; - private GenericActor _logActor; + private Q _queue; + private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) + + "(UNKNOWN)" + + "] "); static final EnumMap<ConsumerTarget.State, State> STATE_MAP = new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class); @@ -89,10 +90,10 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer private final T _target; private final SubFlushRunner _runner = new SubFlushRunner(this); - private volatile QueueContext _queueContext; - private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>() + private volatile QueueContext<E,Q,L> _queueContext; + private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>() { - public void stateChanged(Consumer sub, State oldState, State newState) + public void stateChanged(QueueConsumer sub, State oldState, State newState) { CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); } @@ -158,8 +159,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer throw new RuntimeException(e); } } - final StateChangeListener<Consumer, State> stateListener = - (StateChangeListener<Consumer, State>) getStateListener(); + final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener(); if(stateListener != null) { stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); @@ -251,12 +251,12 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer return STATE_MAP.get(_target.getState()); } - public final SimpleAMQQueue getQueue() + public final Q getQueue() { return _queue; } - final void setQueue(SimpleAMQQueue queue, boolean exclusive) + final void setQueue(Q queue, boolean exclusive) { if(getQueue() != null) { @@ -300,9 +300,9 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer getQueue().flushConsumer(this); } - boolean resend(final MessageInstance entry) throws AMQException + boolean resend(final E entry) throws AMQException { - return getQueue().resend((QueueEntry)entry, this); + return getQueue().resend(entry, this); } final SubFlushRunner getRunner() @@ -315,31 +315,26 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer return _id; } - public final StateChangeListener<? extends Consumer, State> getStateListener() + public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener() { return _stateListener; } - public final void setStateListener(StateChangeListener<? extends Consumer, State> listener) + public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener) { _stateListener = listener; } - final QueueContext getQueueContext() + final QueueContext<E,Q,L> getQueueContext() { return _queueContext; } - final void setQueueContext(QueueContext queueContext) + final void setQueueContext(QueueContext<E,Q,L> queueContext) { _queueContext = queueContext; } - protected boolean updateState(State from, State to) - { - return _state.compareAndSet(from, to); - } - public final boolean isActive() { return getState() == State.ACTIVE; @@ -355,7 +350,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer _noLocal = noLocal; } - public final boolean hasInterest(MessageInstance entry) + public final boolean hasInterest(E entry) { //check that the message hasn't been rejected if (entry.isRejectedBy(this)) @@ -405,7 +400,6 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer filterLogString.append(delimiter); } filterLogString.append("Browser"); - hasEntries = true; } return filterLogString.toString(); @@ -431,7 +425,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer return _createTime; } - final MessageInstance.ConsumerAcquiredState getOwningState() + final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState() { return _owningState; } @@ -466,7 +460,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer return _deliveredCount.longValue(); } - final void send(final QueueEntry entry, final boolean batch) throws AMQException + final void send(final E entry, final boolean batch) throws AMQException { _deliveredCount.incrementAndGet(); ServerMessage message = entry.getMessage(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java index 82e9d58cf3..2cf0e93e9c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java @@ -24,19 +24,19 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class QueueConsumerList +class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - private final ConsumerNode _head = new ConsumerNode(); + private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>(); - private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head); - private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head); + private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head); + private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head); private final AtomicInteger _size = new AtomicInteger(); - public static final class ConsumerNode + public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { private final AtomicBoolean _deleted = new AtomicBoolean(); - private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>(); - private final QueueConsumer _sub; + private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>(); + private final QueueConsumer<?,E,Q,L> _sub; public ConsumerNode() { @@ -45,7 +45,7 @@ class QueueConsumerList _deleted.set(true); } - public ConsumerNode(final QueueConsumer sub) + public ConsumerNode(final QueueConsumer<?,E,Q,L> sub) { //used for regular node construction _sub = sub; @@ -57,12 +57,12 @@ class QueueConsumerList * * @return the next non-deleted node, or null if none was found. */ - public ConsumerNode findNext() + public ConsumerNode<E,Q,L> findNext() { - ConsumerNode next = nextNode(); + ConsumerNode<E,Q,L> next = nextNode(); while(next != null && next.isDeleted()) { - final ConsumerNode newNext = next.nextNode(); + final ConsumerNode<E,Q,L> newNext = next.nextNode(); if(newNext != null) { //try to move our _next reference forward to the 'newNext' @@ -86,7 +86,7 @@ class QueueConsumerList * * @return the immediately next node in the structure, or null if at the tail. */ - protected ConsumerNode nextNode() + protected ConsumerNode<E,Q,L> nextNode() { return _next.get(); } @@ -97,7 +97,7 @@ class QueueConsumerList * @param node the ConsumerNode to set as 'next' * @return whether the operation succeeded */ - private boolean setNext(final ConsumerNode node) + private boolean setNext(final ConsumerNode<E,Q,L> node) { return _next.compareAndSet(null, node); } @@ -112,18 +112,18 @@ class QueueConsumerList return _deleted.compareAndSet(false,true); } - public QueueConsumer getConsumer() + public QueueConsumer<?,E,Q,L> getConsumer() { return _sub; } } - private void insert(final ConsumerNode node, final boolean count) + private void insert(final ConsumerNode<E,Q,L> node, final boolean count) { for (;;) { - ConsumerNode tail = _tail.get(); - ConsumerNode next = tail.nextNode(); + ConsumerNode<E,Q,L> tail = _tail.get(); + ConsumerNode<E,Q,L> next = tail.nextNode(); if (tail == _tail.get()) { if (next == null) @@ -146,16 +146,16 @@ class QueueConsumerList } } - public void add(final QueueConsumer sub) + public void add(final QueueConsumer<?,E,Q,L> sub) { - ConsumerNode node = new ConsumerNode(sub); + ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub); insert(node, true); } - public boolean remove(final QueueConsumer sub) + public boolean remove(final QueueConsumer<?, E,Q,L> sub) { - ConsumerNode prevNode = _head; - ConsumerNode node = _head.nextNode(); + ConsumerNode<E,Q,L> prevNode = _head; + ConsumerNode<E,Q,L> node = _head.nextNode(); while(node != null) { @@ -170,7 +170,7 @@ class QueueConsumerList //correctness reasons, however we have just 'deleted' //the tail. Inserting an empty dummy node after it will //let us scavenge the node containing the Consumer. - insert(new ConsumerNode(), false); + insert(new ConsumerNode<E,Q,L>(), false); } //advance the next node reference in the 'prevNode' to scavenge @@ -189,9 +189,9 @@ class QueueConsumerList return false; } - private void nodeMarkerCleanup(final ConsumerNode node) + private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node) { - ConsumerNode markedNode = _subNodeMarker.get(); + ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get(); if(node == markedNode) { //if the marked node is the one we are removing, then @@ -200,7 +200,7 @@ class QueueConsumerList //into the list and find the next node to use. //Because we inserted a dummy if node was the //tail, markedNode.nextNode() can never be null. - ConsumerNode dummy = new ConsumerNode(); + ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>(); dummy.setNext(markedNode.nextNode()); //if the CAS fails the marked node has changed, thus @@ -219,7 +219,7 @@ class QueueConsumerList } } - public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode) + public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode) { return _subNodeMarker.compareAndSet(expected, nextNode); } @@ -231,41 +231,41 @@ class QueueConsumerList * * @return the previously marked node (or a dummy if it was subsequently deleted) */ - public ConsumerNode getMarkedNode() + public ConsumerNode<E,Q,L> getMarkedNode() { return _subNodeMarker.get(); } - public static class ConsumerNodeIterator + public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - private ConsumerNode _lastNode; + private ConsumerNode<E,Q,L> _lastNode; - ConsumerNodeIterator(ConsumerNode startNode) + ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode) { _lastNode = startNode; } - public ConsumerNode getNode() + public ConsumerNode<E,Q,L> getNode() { return _lastNode; } public boolean advance() { - ConsumerNode nextNode = _lastNode.findNext(); + ConsumerNode<E,Q,L> nextNode = _lastNode.findNext(); _lastNode = nextNode; return _lastNode != null; } } - public ConsumerNodeIterator iterator() + public ConsumerNodeIterator<E,Q,L> iterator() { - return new ConsumerNodeIterator(_head); + return new ConsumerNodeIterator<E,Q,L>(_head); } - public ConsumerNode getHead() + public ConsumerNode<E,Q,L> getHead() { return _head; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java index 861bd3dea1..7a59c154b6 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java @@ -23,32 +23,32 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -final class QueueContext +final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - private volatile QueueEntry _lastSeenEntry; - private volatile QueueEntry _releasedEntry; + private volatile E _lastSeenEntry; + private volatile E _releasedEntry; - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> + static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl> _lastSeenUpdater = AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_lastSeenEntry"); - static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry> + (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry"); + static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl> _releasedUpdater = AtomicReferenceFieldUpdater.newUpdater - (QueueContext.class, QueueEntry.class, "_releasedEntry"); + (QueueContext.class, QueueEntryImpl.class, "_releasedEntry"); - public QueueContext(QueueEntry head) + public QueueContext(E head) { _lastSeenEntry = head; } - public QueueEntry getLastSeenEntry() + public E getLastSeenEntry() { return _lastSeenEntry; } - QueueEntry getReleasedEntry() + E getReleasedEntry() { return _releasedEntry; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 6a42088c47..9a49cd6088 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -20,20 +20,21 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; -public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry> +public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E> { - AMQQueue<QueueConsumer> getQueue(); + Q getQueue(); long getSize(); boolean isQueueDeleted(); - QueueEntry getNextNode(); + E getNextNode(); - QueueEntry getNextValidEntry(); + E getNextValidEntry(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 8b81a87903..8b6968b2cf 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -44,11 +44,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public abstract class QueueEntryImpl implements QueueEntry +public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>> { private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); - private final QueueEntryList _queueEntryList; + private final L _queueEntryList; private final MessageReference _message; @@ -63,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry (QueueEntryImpl.class, EntryState.class, "_state"); - private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners; + private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners; private static final AtomicReferenceFieldUpdater<QueueEntryImpl, Set> @@ -90,14 +90,14 @@ public abstract class QueueEntryImpl implements QueueEntry private boolean _deliveredToConsumer; - public QueueEntryImpl(QueueEntryList<?> queueEntryList) + public QueueEntryImpl(L queueEntryList) { this(queueEntryList,null,Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId) + public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; @@ -107,7 +107,7 @@ public abstract class QueueEntryImpl implements QueueEntry populateInstanceProperties(); } - public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message) + public QueueEntryImpl(L queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(); @@ -138,7 +138,7 @@ public abstract class QueueEntryImpl implements QueueEntry return _entryId; } - public AMQQueue<QueueConsumer> getQueue() + public Q getQueue() { return _queueEntryList.getQueue(); } @@ -234,12 +234,12 @@ public abstract class QueueEntryImpl implements QueueEntry if(state instanceof ConsumerAcquiredState) { - getQueue().decrementUnackedMsgCount(this); + getQueue().decrementUnackedMsgCount((E) this); } if(!getQueue().isDeleted()) { - getQueue().requeue(this); + getQueue().requeue((E)this); if(_stateChangeListeners != null) { notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); @@ -315,13 +315,12 @@ public abstract class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { - Consumer s = null; if (state instanceof ConsumerAcquiredState) { - getQueue().decrementUnackedMsgCount(this); + getQueue().decrementUnackedMsgCount((E) this); } - getQueue().dequeue(this,s); + getQueue().dequeue((E)this); if(_stateChangeListeners != null) { notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); @@ -333,9 +332,9 @@ public abstract class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners) + for(StateChangeListener<? super E, State> l : _stateChangeListeners) { - l.stateChanged(this, oldState, newState); + l.stateChanged((E)this, oldState, newState); } } @@ -345,7 +344,7 @@ public abstract class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.entryDeleted(this); + _queueEntryList.entryDeleted((E)this); onDelete(); _message.release(); @@ -364,7 +363,7 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn) + public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); @@ -412,21 +411,21 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue().isDeleted(); } - public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener) + public void addStateChangeListener(StateChangeListener<? super E,State> listener) { - Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners; + Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners; if(listeners == null) { - _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>()); + _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>()); listeners = _stateChangeListeners; } listeners.add(listener); } - public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener) + public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener) { - Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners; + Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners; if(listeners != null) { return listeners.remove(listener); @@ -436,9 +435,9 @@ public abstract class QueueEntryImpl implements QueueEntry } - public int compareTo(final QueueEntry o) + public int compareTo(final E o) { - QueueEntryImpl other = (QueueEntryImpl)o; + E other = o; return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } @@ -446,7 +445,7 @@ public abstract class QueueEntryImpl implements QueueEntry { } - public QueueEntryList getQueueEntryList() + public L getQueueEntryList() { return _queueEntryList; } @@ -494,10 +493,10 @@ public abstract class QueueEntryImpl implements QueueEntry @Override public boolean resend() throws AMQException { - QueueConsumer sub = getDeliveredConsumer(); + QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer(); if(sub != null) { - return sub.resend(this); + return sub.resend((E)this); } return false; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java index 73ebb0f300..72502feb3d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryIterator<QE extends QueueEntry> +import org.apache.qpid.server.consumer.Consumer; + +public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> { boolean atTail(); - QE getNode(); + E getNode(); boolean advance(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index ad1f703f51..a49320e9d6 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,21 +20,23 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.ServerMessage; -public interface QueueEntryList<Q extends QueueEntry> +public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> { - AMQQueue<QueueConsumer> getQueue(); + Q getQueue(); - Q add(ServerMessage message); + E add(ServerMessage message); - Q next(Q node); + E next(E node); - QueueEntryIterator<Q> iterator(); + QueueEntryIterator<E,Q,L,C> iterator(); - Q getHead(); + E getHead(); - void entryDeleted(Q queueEntry); + void entryDeleted(E queueEntry); int getPriorities(); + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java index 4dbce45f67..ae8b6560be 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -interface QueueEntryListFactory +interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> { - public QueueEntryList createQueueEntryList(AMQQueue queue); + public L createQueueEntryList(Q queue); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java index 9ecaf6dafd..e6b5ac5611 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java @@ -16,7 +16,9 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryVisitor +import org.apache.qpid.server.consumer.Consumer; + +public interface QueueEntryVisitor<E extends QueueEntry> { - boolean visit(QueueEntry entry); + boolean visit(E entry); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index f4a9794fcd..4450a3ed0c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -52,6 +51,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -59,9 +59,9 @@ import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, - StateChangeListener<QueueConsumer, QueueConsumer.State>, - MessageGroupManager.ConsumerResetHelper +abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>, + StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>, + MessageGroupManager.ConsumerResetHelper<E,Q,L> { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); @@ -94,11 +94,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, private Exchange _alternateExchange; - private final QueueEntryList<QueueEntry> _entries; + private final L _entries; - private final QueueConsumerList _consumerList = new QueueConsumerList(); + private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>(); - private volatile QueueConsumer _exclusiveSubscriber; + private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber; @@ -163,8 +163,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, private LogSubject _logSubject; private LogActor _logActor; - private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER"; - private boolean _nolocal; + private boolean _noLocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); private boolean _deleteOnNoConsumers; @@ -177,20 +176,15 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ private int _maximumDeliveryCount; - private final MessageGroupManager _messageGroupManager; + private final MessageGroupManager<E,Q,L> _messageGroupManager; - private final Collection<ConsumerRegistrationListener> _consumerListeners = - new ArrayList<ConsumerRegistrationListener>(); + private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners = + new ArrayList<ConsumerRegistrationListener<Q>>(); private AMQQueue.NotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) - { - this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); - } - protected SimpleAMQQueue(UUID id, String name, boolean durable, @@ -198,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, Map<String,Object> arguments) + QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments) { if (name == null) @@ -217,7 +211,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, _autoDelete = autoDelete; _exclusive = exclusive; _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList(this); + _entries = entryListFactory.createQueueEntryList((Q)this); _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); _id = id; @@ -243,13 +237,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, { Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), + new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get( + _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get( Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } @@ -281,21 +275,20 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } catch (RejectedExecutionException ree) { - if (_stopped.get()) - { - // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. - } - else + // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. + if(!_stopped.get()) { _logger.error("Unexpected rejected execution", ree); throw ree; + } + } } public void setNoLocal(boolean nolocal) { - _nolocal = nolocal; + _noLocal = nolocal; } public UUID getId() @@ -384,7 +377,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, @Override - public synchronized QueueConsumer addConsumer(final ConsumerTarget target, + public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target, final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, @@ -412,10 +405,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, throw new ExistingConsumerPreventsExclusive(); } - QueueConsumer consumer = new QueueConsumer(filters, messageClass, - optionSet.contains(Consumer.Option.ACQUIRES), - optionSet.contains(Consumer.Option.SEES_REQUEUES), - consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); + QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass, + optionSet.contains(Consumer.Option.ACQUIRES), + optionSet.contains(Consumer.Option.SEES_REQUEUES), + consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target); target.consumerAdded(consumer); @@ -430,21 +423,21 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } consumer.setStateListener(this); - consumer.setQueueContext(new QueueContext(_entries.getHead())); + consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead())); if (!isDeleted()) { - consumer.setQueue(this, exclusive); - if(_nolocal) + consumer.setQueue((Q)this, exclusive); + if(_noLocal) { - consumer.setNoLocal(_nolocal); + consumer.setNoLocal(true); } synchronized (_consumerListeners) { - for(ConsumerRegistrationListener listener : _consumerListeners) + for(ConsumerRegistrationListener<Q> listener : _consumerListeners) { - listener.consumerAdded(this, consumer); + listener.consumerAdded((Q)this, consumer); } } @@ -466,7 +459,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException + synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException { if (consumer == null) { @@ -494,9 +487,9 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, synchronized (_consumerListeners) { - for(ConsumerRegistrationListener listener : _consumerListeners) + for(ConsumerRegistrationListener<Q> listener : _consumerListeners) { - listener.consumerRemoved(this, consumer); + listener.consumerRemoved((Q)this, consumer); } } @@ -519,10 +512,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public Collection<QueueConsumer> getConsumers() + public Collection<QueueConsumer<?,E,Q,L>> getConsumers() { - List<QueueConsumer> consumers = new ArrayList<QueueConsumer>(); - QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); + List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator(); while(iter.advance()) { consumers.add(iter.getNode().getConsumer()); @@ -531,7 +524,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) + public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener) { synchronized (_consumerListeners) { @@ -539,7 +532,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener) { synchronized (_consumerListeners) { @@ -547,9 +540,9 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) + public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments) { - QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); + E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); if(clearAssignments) { _messageGroupManager.clearAssignments(consumer); @@ -557,11 +550,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, if(entry != null) { - QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance()) { - QueueConsumer sub = subscriberIter.getNode().getConsumer(); + QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -599,11 +592,6 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public int getBindingCountHigh() - { - return _bindingCountHigh.get(); - } - public void removeBinding(final Binding binding) { _bindings.remove(binding); @@ -626,7 +614,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException + public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException { incrementQueueCount(); incrementQueueSize(message); @@ -634,8 +622,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, _totalMessagesReceived.incrementAndGet(); - QueueEntry entry; - final QueueConsumer exclusiveSub = _exclusiveSubscriber; + E entry; + final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber; entry = _entries.add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) @@ -645,8 +633,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message */ - QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); - QueueConsumerList.ConsumerNode nextNode = node.findNext(); + QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext(); if (nextNode == null) { nextNode = _consumerList.getHead().findNext(); @@ -682,7 +670,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, else { // if consumer at end, and active, offer - QueueConsumer sub = nextNode.getConsumer(); + QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer(); deliverToConsumer(sub, entry); } nextNode = nextNode.findNext(); @@ -714,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry) + private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException { @@ -746,7 +734,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - private boolean assign(final QueueConsumer sub, final QueueEntry entry) + private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry) { if(_messageGroupManager == null) { @@ -760,7 +748,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry) + private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry) { if(_messageGroupManager == null || !sub.acquires()) { @@ -770,7 +758,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return (assigned == null) || (assigned == sub); } - protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry) + protected void checkConsumersNotAheadOfDelivery(final E entry) { // This method is only required for queues which mess with ordering // Simple Queues don't :-) @@ -804,7 +792,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, getAtomicQueueCount().incrementAndGet(); } - private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch) + private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch) throws AMQException { setLastSeenEntry(sub, entry); @@ -815,18 +803,18 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, sub.send(entry, batch); } - private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException + private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException { return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); } - private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry) + private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry) { - QueueContext subContext = sub.getQueueContext(); + QueueContext<E,Q,L> subContext = sub.getQueueContext(); if (subContext != null) { - QueueEntry releasedEntry = subContext.getReleasedEntry(); + E releasedEntry = subContext.getReleasedEntry(); QueueContext._lastSeenUpdater.set(subContext, entry); if(releasedEntry == entry) @@ -836,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry) + private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry) { - QueueContext subContext = sub.getQueueContext(); + QueueContext<E,Q,L> subContext = sub.getQueueContext(); if(subContext != null) { - QueueEntry oldEntry; + E oldEntry; while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0) { @@ -854,13 +842,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public void requeue(QueueEntry entry) + public void requeue(E entry) { - QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator(); // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards while (subscriberIter.advance() && entry.isAvailable()) { - QueueConsumer sub = subscriberIter.getNode().getConsumer(); + QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer(); // we don't make browsers send the same stuff twice if (sub.seesRequeues()) @@ -873,7 +861,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public void dequeue(QueueEntry entry, Consumer sub) + @Override + public void dequeue(E entry) { decrementQueueCount(); decrementQueueSize(entry); @@ -886,7 +875,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - private void decrementQueueSize(final QueueEntry entry) + private void decrementQueueSize(final E entry) { final ServerMessage message = entry.getMessage(); long size = message.getSize(); @@ -905,7 +894,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, _dequeueCount.incrementAndGet(); } - public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException + public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException { /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message entry to resend and move back the consumer pointer. */ @@ -915,7 +904,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, { if (!consumer.isClosed()) { - deliverMessage((QueueConsumer) consumer, entry, false); + deliverMessage(consumer, entry, false); return true; } else @@ -981,11 +970,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, public long getOldestMessageArrivalTime() { - QueueEntry entry = getOldestQueueEntry(); + E entry = getOldestQueueEntry(); return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); } - protected QueueEntry getOldestQueueEntry() + protected E getOldestQueueEntry() { return _entries.next(_entries.getHead()); } @@ -995,13 +984,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return _deleted.get(); } - public List<QueueEntry> getMessagesOnTheQueue() + public List<E> getMessagesOnTheQueue() { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + ArrayList<E> entryList = new ArrayList<E>(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node != null && !node.isDeleted()) { entryList.add(node); @@ -1011,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState) + public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState) { if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE) { @@ -1029,7 +1018,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public int compareTo(final AMQQueue o) + public int compareTo(final Q o) { return _name.compareTo(o.getName()); } @@ -1049,7 +1038,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return _exclusiveSubscriber != null; } - private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) + private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber) { _exclusiveSubscriber = exclusiveSubscriber; } @@ -1060,32 +1049,32 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - protected QueueEntryList getEntries() + protected L getEntries() { return _entries; } - protected QueueConsumerList getConsumerList() + protected QueueConsumerList<E,Q,L> getConsumerList() { return _consumerList; } - public static interface QueueEntryFilter + public static interface QueueEntryFilter<E extends QueueEntry> { - public boolean accept(QueueEntry entry); + public boolean accept(E entry); public boolean filterComplete(); } - public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) + public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) { - return getMessagesOnTheQueue(new QueueEntryFilter() + return getMessagesOnTheQueue(new QueueEntryFilter<E>() { - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { final long messageId = entry.getMessage().getMessageNumber(); return messageId >= fromMessageId && messageId <= toMessageId; @@ -1098,13 +1087,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, }); } - public QueueEntry getMessageOnTheQueue(final long messageId) + public E getMessageOnTheQueue(final long messageId) { - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>() { private boolean _complete; - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { _complete = entry.getMessage().getMessageNumber() == messageId; return _complete; @@ -1118,13 +1107,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return entries.isEmpty() ? null : entries.get(0); } - public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) + public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter) { - ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + ArrayList<E> entryList = new ArrayList<E>(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance() && !filter.filterComplete()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (!node.isDeleted() && filter.accept(node)) { entryList.add(node); @@ -1134,13 +1123,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public void visit(final QueueEntryVisitor visitor) + public void visit(final QueueEntryVisitor<E> visitor) { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while(queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if(!node.isDeleted()) { @@ -1157,17 +1146,17 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, * * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition - * @param toPosition - * @return + * @param fromPosition first message position + * @param toPosition last message position + * @return list of messages */ - public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) + public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) { - return getMessagesOnTheQueue(new QueueEntryFilter() + return getMessagesOnTheQueue(new QueueEntryFilter<E>() { private long position = 0; - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { position++; return (position >= fromPosition) && (position <= toPosition); @@ -1196,12 +1185,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, // TODO - now only used by the tests public void deleteMessageFromTop() { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); boolean noDeletes = true; while (noDeletes && queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node.acquire()) { dequeueEntry(node); @@ -1224,14 +1213,14 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, throw new AMQSecurityException("Permission denied: queue " + getName()); } - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); long count = 0; ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); if (node.acquire()) { dequeueEntry(node, txn); @@ -1248,13 +1237,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return count; } - private void dequeueEntry(final QueueEntry node) + private void dequeueEntry(final E node) { ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); dequeueEntry(node, txn); } - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) + private void dequeueEntry(final E node, ServerTransaction txn) { txn.dequeue(this, node.getMessage(), new ServerTransaction.Action() @@ -1283,7 +1272,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } // TODO list all thrown exceptions - public int delete() throws AMQSecurityException, AMQException + public int delete() throws AMQException { // Check access if (!_virtualHost.getSecurityManager().authoriseDelete(this)) @@ -1313,10 +1302,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() + List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>() { - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { return entry.acquire(); } @@ -1330,7 +1319,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - for(final QueueEntry entry : entries) + for(final E entry : entries) { // TODO log requeues with a post enqueue action int requeues = entry.routeToAlternate(null, txn); @@ -1435,7 +1424,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - public void deliverAsync(QueueConsumer sub) + public void deliverAsync(QueueConsumer<?,E,Q,L> sub) { if(_exclusiveSubscriber == null) { @@ -1449,7 +1438,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } - void flushConsumer(QueueConsumer sub) throws AMQException + void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException { // Access control if (!getVirtualHost().getSecurityManager().authoriseConsume(this)) @@ -1459,7 +1448,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, flushConsumer(sub, Long.MAX_VALUE); } - boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException + boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException { boolean atTail = false; final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; @@ -1480,8 +1469,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, sub.getSendLock(); } - atTail = attemptDelivery((QueueConsumer)sub, true); - if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null) + atTail = attemptDelivery(sub, true); + if (atTail && getNextAvailableEntry(sub) == null) { queueEmpty = true; } @@ -1532,12 +1521,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, * Looks up the next node for the consumer and attempts to deliver it. * * - * @param sub - * @param batch + * @param sub the consumer + * @param batch true if processing can be batched * @return true if we have completed all possible deliveries for this sub. * @throws AMQException */ - private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException + private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException { boolean atTail = false; @@ -1545,7 +1534,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, if (subActive) { - QueueEntry node = getNextAvailableEntry(sub); + E node = getNextAvailableEntry(sub); if (node != null && node.isAvailable()) { @@ -1582,11 +1571,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, protected void advanceAllConsumers() throws AMQException { - QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator(); while (consumerNodeIterator.advance()) { - QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); - QueueConsumer sub = subNode.getConsumer(); + QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode(); + QueueConsumer<?,E,Q,L> sub = subNode.getConsumer(); if(sub.acquires()) { getNextAvailableEntry(sub); @@ -1598,16 +1587,16 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - private QueueEntry getNextAvailableEntry(final QueueConsumer sub) + private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub) throws AMQException { - QueueContext context = sub.getQueueContext(); + QueueContext<E,Q,L> context = sub.getQueueContext(); if(context != null) { - QueueEntry lastSeen = context.getLastSeenEntry(); - QueueEntry releasedNode = context.getReleasedEntry(); + E lastSeen = context.getLastSeenEntry(); + E releasedNode = context.getReleasedEntry(); - QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); + E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || @@ -1639,12 +1628,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, } } - public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub) + public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub) { - QueueContext context = sub.getQueueContext(); + QueueContext<E,Q,L> context = sub.getQueueContext(); if(context != null) { - QueueEntry releasedNode = context.getReleasedEntry(); + E releasedNode = context.getReleasedEntry(); return releasedNode != null && releasedNode.compareTo(entry) < 0; } else @@ -1681,7 +1670,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, */ public long processQueue(QueueRunner runner) throws AMQException { - long stateChangeCount = Long.MIN_VALUE; + long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; long rVal = Long.MIN_VALUE; boolean deliveryIncomplete = true; @@ -1716,11 +1705,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, boolean allConsumersDone = true; boolean consumerDone; - QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator(); //iterate over the subscribers and try to advance their pointer while (consumerNodeIterator.advance()) { - QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); + QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer(); sub.getSendLock(); try @@ -1802,11 +1791,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, public void checkMessageStatus() throws AMQException { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator(); while (queueListIterator.advance()) { - QueueEntry node = queueListIterator.getNode(); + E node = queueListIterator.getNode(); // Only process nodes that are not currently deleted and not dequeued if (!node.isDeleted()) { @@ -1953,12 +1942,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return _notificationChecks; } - private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State> + private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State> { - private final QueueConsumer _sub; + private final QueueConsumer<?,E,Q,L> _sub; - public QueueEntryListener(final QueueConsumer sub) + public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub) { _sub = sub; } @@ -1974,7 +1963,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return System.identityHashCode(_sub); } - public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState) + public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); deliverAsync(_sub); @@ -2082,13 +2071,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return _unackedMsgBytes.get(); } - public void decrementUnackedMsgCount(QueueEntry queueEntry) + public void decrementUnackedMsgCount(E queueEntry) { _unackedMsgCount.decrementAndGet(); _unackedMsgBytes.addAndGet(-queueEntry.getSize()); } - private void incrementUnackedMsgCount(QueueEntry entry) + private void incrementUnackedMsgCount(E entry) { _unackedMsgCount.incrementAndGet(); _unackedMsgBytes.addAndGet(entry.getSize()); @@ -2159,10 +2148,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>, return (String) _arguments.get(Queue.DESCRIPTION); } - public final int send(final ServerMessage message, + public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, final InstanceProperties instanceProperties, final ServerTransaction txn, - final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) { txn.enqueue(this,message, new ServerTransaction.Action() { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 101771c7cc..b6954e0696 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,205 +1,25 @@ /* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.ServerMessage; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl> +public interface SimpleQueueEntryList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>> { - - private final SimpleQueueEntryImpl _head; - - private volatile SimpleQueueEntryImpl _tail; - - static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl> - _tailUpdater = - AtomicReferenceFieldUpdater.newUpdater - (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail"); - - - private final AMQQueue<QueueConsumer> _queue; - - static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl> - _nextUpdater = SimpleQueueEntryImpl._nextUpdater; - - private AtomicLong _scavenges = new AtomicLong(0L); - private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50); - private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>(); - - - public SimpleQueueEntryList(AMQQueue<QueueConsumer> queue) - { - _queue = queue; - _head = new SimpleQueueEntryImpl(this); - _tail = _head; - } - - void scavenge() - { - SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null); - SimpleQueueEntryImpl next = _head.getNextValidEntry(); - - if(hwm != null) - { - while (next != null && hwm.compareTo(next)>0) - { - next = next.getNextValidEntry(); - } - } - } - - - public AMQQueue<QueueConsumer> getQueue() - { - return _queue; - } - - - public SimpleQueueEntryImpl add(ServerMessage message) - { - SimpleQueueEntryImpl node = createQueueEntry(message); - for (;;) - { - SimpleQueueEntryImpl tail = _tail; - SimpleQueueEntryImpl next = tail.getNextNode(); - if (tail == _tail) - { - if (next == null) - { - node.setEntryId(tail.getEntryId()+1); - if (_nextUpdater.compareAndSet(tail, null, node)) - { - _tailUpdater.compareAndSet(this, tail, node); - - return node; - } - } - else - { - _tailUpdater.compareAndSet(this,tail, next); - } - } - } - } - - protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message) - { - return new SimpleQueueEntryImpl(this, message); - } - - public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node) - { - return node.getNextValidEntry(); - } - - public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl> - { - private SimpleQueueEntryImpl _lastNode; - - QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode) - { - _lastNode = startNode; - } - - public boolean atTail() - { - return _lastNode.getNextValidEntry() == null; - } - - public SimpleQueueEntryImpl getNode() - { - return _lastNode; - } - - public boolean advance() - { - SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry(); - - if(nextValidNode != null) - { - _lastNode = nextValidNode; - } - - return nextValidNode != null; - } - } - - public QueueEntryIteratorImpl iterator() - { - return new QueueEntryIteratorImpl(_head); - } - - - public SimpleQueueEntryImpl getHead() - { - return _head; - } - - public void entryDeleted(SimpleQueueEntryImpl queueEntry) - { - SimpleQueueEntryImpl next = _head.getNextNode(); - SimpleQueueEntryImpl newNext = _head.getNextValidEntry(); - - // the head of the queue has not been deleted, hence the deletion must have been mid queue. - if (next == newNext) - { - SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get(); - while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0) - { - _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry); - unscavengedHWM = _unscavengedHWM.get(); - } - if (_scavenges.incrementAndGet() > _scavengeCount) - { - _scavenges.set(0L); - scavenge(); - } - } - else - { - SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get(); - if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0)) - { - _unscavengedHWM.compareAndSet(unscavengedHWM, null); - } - } - } - - public int getPriorities() - { - return 0; - } - - static class Factory implements QueueEntryListFactory - { - - public SimpleQueueEntryList createQueueEntryList(AMQQueue queue) - { - return new SimpleQueueEntryList(queue); - } - } - - } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index cad1aa6d4f..2cae18f3ec 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; -public class SortedQueue extends OutOfOrderQueue +public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList> { //Lock object to synchronize enqueue. Used instead of the object //monitor to prevent lock order issues with consumer sendLocks @@ -41,17 +41,33 @@ public class SortedQueue extends OutOfOrderQueue final boolean durable, final String owner, final boolean autoDelete, final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) { + this(id, name, durable, owner, autoDelete, exclusive, + virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName)); + } + + + protected SortedQueue(UUID id, final String name, + final boolean durable, final String owner, final boolean autoDelete, + final boolean exclusive, final VirtualHost virtualHost, + Map<String, Object> arguments, + String sortedPropertyName, + QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory) + { super(id, name, durable, owner, autoDelete, exclusive, - virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments); + virtualHost, factory, arguments); this._sortedPropertyName = sortedPropertyName; } + public String getSortedPropertyName() { return _sortedPropertyName; } - public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException + @Override + public void enqueue(final ServerMessage message, + final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action) + throws AMQException { synchronized (_sortedQueueLock) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java index 1052adbe67..30d58138fb 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java @@ -24,37 +24,37 @@ import org.apache.qpid.server.message.ServerMessage; /** * An implementation of QueueEntryImpl to be used in SortedQueueEntryList. */ -public class SortedQueueEntryImpl extends QueueEntryImpl +public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList> { public static enum Colour { RED, BLACK }; - private volatile SortedQueueEntryImpl _next; - private SortedQueueEntryImpl _prev; + private volatile SortedQueueEntry _next; + private SortedQueueEntry _prev; private String _key; private Colour _colour = Colour.BLACK; - private SortedQueueEntryImpl _parent; - private SortedQueueEntryImpl _left; - private SortedQueueEntryImpl _right; + private SortedQueueEntry _parent; + private SortedQueueEntry _left; + private SortedQueueEntry _right; - public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList) + public SortedQueueEntry(final SortedQueueEntryList queueEntryList) { super(queueEntryList); } - public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList, - final ServerMessage message, final long entryId) + public SortedQueueEntry(final SortedQueueEntryList queueEntryList, + final ServerMessage message, final long entryId) { super(queueEntryList, message, entryId); } @Override - public int compareTo(final QueueEntry o) + public int compareTo(final SortedQueueEntry o) { - final String otherKey = ((SortedQueueEntryImpl) o)._key; + final String otherKey = o._key; final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey); return compare == 0 ? super.compareTo(o) : compare; } @@ -69,33 +69,33 @@ public class SortedQueueEntryImpl extends QueueEntryImpl return _key; } - public SortedQueueEntryImpl getLeft() + public SortedQueueEntry getLeft() { return _left; } - public SortedQueueEntryImpl getNextNode() + public SortedQueueEntry getNextNode() { return _next; } @Override - public SortedQueueEntryImpl getNextValidEntry() + public SortedQueueEntry getNextValidEntry() { return getNextNode(); } - public SortedQueueEntryImpl getParent() + public SortedQueueEntry getParent() { return _parent; } - public SortedQueueEntryImpl getPrev() + public SortedQueueEntry getPrev() { return _prev; } - public SortedQueueEntryImpl getRight() + public SortedQueueEntry getRight() { return _right; } @@ -110,27 +110,27 @@ public class SortedQueueEntryImpl extends QueueEntryImpl _key = key; } - public void setLeft(final SortedQueueEntryImpl left) + public void setLeft(final SortedQueueEntry left) { _left = left; } - public void setNext(final SortedQueueEntryImpl next) + public void setNext(final SortedQueueEntry next) { _next = next; } - public void setParent(final SortedQueueEntryImpl parent) + public void setParent(final SortedQueueEntry parent) { _parent = parent; } - public void setPrev(final SortedQueueEntryImpl prev) + public void setPrev(final SortedQueueEntry prev) { _prev = prev; } - public void setRight(final SortedQueueEntryImpl right) + public void setRight(final SortedQueueEntry right) { _right = right; } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 336ee566eb..05b874cd91 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; +import org.apache.qpid.server.queue.SortedQueueEntry.Colour; /** * A sorted implementation of QueueEntryList. @@ -30,28 +30,28 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; * ISBN-13: 978-0262033848 * see http://en.wikipedia.org/wiki/Red-black_tree */ -public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl> +public class SortedQueueEntryList implements SimpleQueueEntryList<SortedQueueEntry, SortedQueue, SortedQueueEntryList> { - private final SortedQueueEntryImpl _head; - private SortedQueueEntryImpl _root; + private final SortedQueueEntry _head; + private SortedQueueEntry _root; private long _entryId = Long.MIN_VALUE; private final Object _lock = new Object(); - private final AMQQueue<QueueConsumer> _queue; + private final SortedQueue _queue; private final String _propertyName; - public SortedQueueEntryList(final AMQQueue<QueueConsumer> queue, final String propertyName) + public SortedQueueEntryList(final SortedQueue queue, final String propertyName) { _queue = queue; - _head = new SortedQueueEntryImpl(this); + _head = new SortedQueueEntry(this); _propertyName = propertyName; } - public AMQQueue<QueueConsumer> getQueue() + public SortedQueue getQueue() { return _queue; } - public SortedQueueEntryImpl add(final ServerMessage message) + public SortedQueueEntry add(final ServerMessage message) { synchronized(_lock) { @@ -62,7 +62,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl key = val.toString(); } - final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId); + final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId); entry.setKey(key); insert(entry); @@ -75,9 +75,9 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl * Red Black Tree insert implementation. * @param entry the entry to insert. */ - private void insert(final SortedQueueEntryImpl entry) + private void insert(final SortedQueueEntry entry) { - SortedQueueEntryImpl node = _root; + SortedQueueEntry node; if((node = _root) == null) { _root = entry; @@ -87,7 +87,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } else { - SortedQueueEntryImpl parent = null; + SortedQueueEntry parent = null; while(node != null) { parent = node; @@ -105,7 +105,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl if(entry.compareTo(parent) < 0) { parent.setLeft(entry); - final SortedQueueEntryImpl prev = parent.getPrev(); + final SortedQueueEntry prev = parent.getPrev(); entry.setNext(parent); prev.setNext(entry); entry.setPrev(prev); @@ -115,7 +115,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl { parent.setRight(entry); - final SortedQueueEntryImpl next = parent.getNextValidEntry(); + final SortedQueueEntry next = parent.getNextValidEntry(); entry.setNext(next); parent.setNext(entry); @@ -130,15 +130,15 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl insertFixup(entry); } - private void insertFixup(SortedQueueEntryImpl entry) + private void insertFixup(SortedQueueEntry entry) { while(isParentColour(entry, Colour.RED)) { - final SortedQueueEntryImpl grandparent = nodeGrandparent(entry); + final SortedQueueEntry grandparent = nodeGrandparent(entry); if(nodeParent(entry) == leftChild(grandparent)) { - final SortedQueueEntryImpl y = rightChild(grandparent); + final SortedQueueEntry y = rightChild(grandparent); if(isNodeColour(y, Colour.RED)) { setColour(nodeParent(entry), Colour.BLACK); @@ -160,7 +160,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } else { - final SortedQueueEntryImpl y = leftChild(grandparent); + final SortedQueueEntry y = leftChild(grandparent); if(isNodeColour(y, Colour.RED)) { setColour(nodeParent(entry), Colour.BLACK); @@ -184,11 +184,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl _root.setColour(Colour.BLACK); } - private void leftRotate(final SortedQueueEntryImpl entry) + private void leftRotate(final SortedQueueEntry entry) { if(entry != null) { - final SortedQueueEntryImpl rightChild = rightChild(entry); + final SortedQueueEntry rightChild = rightChild(entry); entry.setRight(rightChild.getLeft()); if(entry.getRight() != null) { @@ -212,11 +212,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - private void rightRotate(final SortedQueueEntryImpl entry) + private void rightRotate(final SortedQueueEntry entry) { if(entry != null) { - final SortedQueueEntryImpl leftChild = leftChild(entry); + final SortedQueueEntry leftChild = leftChild(entry); entry.setLeft(leftChild.getRight()); if(entry.getLeft() != null) { @@ -240,7 +240,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - private void setColour(final SortedQueueEntryImpl node, final Colour colour) + private void setColour(final SortedQueueEntry node, final Colour colour) { if(node != null) { @@ -248,45 +248,45 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node) + private SortedQueueEntry leftChild(final SortedQueueEntry node) { return node == null ? null : node.getLeft(); } - private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node) + private SortedQueueEntry rightChild(final SortedQueueEntry node) { return node == null ? null : node.getRight(); } - private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node) + private SortedQueueEntry nodeParent(final SortedQueueEntry node) { return node == null ? null : node.getParent(); } - private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node) + private SortedQueueEntry nodeGrandparent(final SortedQueueEntry node) { return nodeParent(nodeParent(node)); } - private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + private boolean isParentColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour) { return node != null && isNodeColour(node.getParent(), colour); } - protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour) + protected boolean isNodeColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour) { return (node == null ? Colour.BLACK : node.getColour()) == colour; } - public SortedQueueEntryImpl next(final SortedQueueEntryImpl node) + public SortedQueueEntry next(final SortedQueueEntry node) { synchronized(_lock) { if(node.isDeleted() && _head != node) { - SortedQueueEntryImpl current = _head; - SortedQueueEntryImpl next; + SortedQueueEntry current = _head; + SortedQueueEntry next; while(current != null) { next = current.getNextValidEntry(); @@ -308,22 +308,22 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - public QueueEntryIterator<SortedQueueEntryImpl> iterator() + public QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> iterator() { return new QueueEntryIteratorImpl(_head); } - public SortedQueueEntryImpl getHead() + public SortedQueueEntry getHead() { return _head; } - protected SortedQueueEntryImpl getRoot() + protected SortedQueueEntry getRoot() { return _root; } - public void entryDeleted(final SortedQueueEntryImpl entry) + public void entryDeleted(final SortedQueueEntry entry) { synchronized(_lock) { @@ -336,20 +336,20 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl // Then deal with the easy doubly linked list deletion (need to do // this after the swap as the swap uses next - final SortedQueueEntryImpl prev = entry.getPrev(); + final SortedQueueEntry prev = entry.getPrev(); if(prev != null) { prev.setNext(entry.getNextValidEntry()); } - final SortedQueueEntryImpl next = entry.getNextValidEntry(); + final SortedQueueEntry next = entry.getNextValidEntry(); if(next != null) { next.setPrev(prev); } // now deal with splicing - final SortedQueueEntryImpl chosenChild; + final SortedQueueEntry chosenChild; if(leftChild(entry) != null) { @@ -428,14 +428,14 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl /** * Swaps the position of the node in the tree with it's successor * (that is the node with the next highest key) - * @param entry + * @param entry the entry to be swapped with its successor */ - private void swapWithSuccessor(final SortedQueueEntryImpl entry) + private void swapWithSuccessor(final SortedQueueEntry entry) { - final SortedQueueEntryImpl next = entry.getNextValidEntry(); - final SortedQueueEntryImpl nextParent = next.getParent(); - final SortedQueueEntryImpl nextLeft = next.getLeft(); - final SortedQueueEntryImpl nextRight = next.getRight(); + final SortedQueueEntry next = entry.getNextValidEntry(); + final SortedQueueEntry nextParent = next.getParent(); + final SortedQueueEntry nextLeft = next.getLeft(); + final SortedQueueEntry nextRight = next.getRight(); final Colour nextColour = next.getColour(); // Special case - the successor is the right child of the node @@ -530,7 +530,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - private void deleteFixup(SortedQueueEntryImpl entry) + private void deleteFixup(SortedQueueEntry entry) { int i = 0; while(entry != null && entry != _root @@ -545,7 +545,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl if(entry == leftChild(nodeParent(entry))) { - SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry)); + SortedQueueEntry rightSibling = rightChild(nodeParent(entry)); if(isNodeColour(rightSibling, Colour.RED)) { setColour(rightSibling, Colour.BLACK); @@ -578,7 +578,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } else { - SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry)); + SortedQueueEntry leftSibling = leftChild(nodeParent(entry)); if(isNodeColour(leftSibling, Colour.RED)) { setColour(leftSibling, Colour.BLACK); @@ -613,16 +613,16 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl setColour(entry, Colour.BLACK); } - private Colour getColour(final SortedQueueEntryImpl x) + private Colour getColour(final SortedQueueEntry x) { return x == null ? null : x.getColour(); } - public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl> + public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> { - private SortedQueueEntryImpl _lastNode; + private SortedQueueEntry _lastNode; - public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode) + public QueueEntryIteratorImpl(final SortedQueueEntry startNode) { _lastNode = startNode; } @@ -632,7 +632,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl return next(_lastNode) == null; } - public SortedQueueEntryImpl getNode() + public SortedQueueEntry getNode() { return _lastNode; } @@ -641,7 +641,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl { if(!atTail()) { - SortedQueueEntryImpl nextNode = next(_lastNode); + SortedQueueEntry nextNode = next(_lastNode); while(nextNode.isDeleted() && next(nextNode) != null) { nextNode = next(nextNode); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java index 87c79178f0..69ffcd67a7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -19,7 +19,7 @@ */ package org.apache.qpid.server.queue; -public class SortedQueueEntryListFactory implements QueueEntryListFactory +public class SortedQueueEntryListFactory implements QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> { private final String _propertyName; @@ -30,9 +30,8 @@ public class SortedQueueEntryListFactory implements QueueEntryListFactory } @Override - public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue) + public SortedQueueEntryList createQueueEntryList(final SortedQueue queue) { return new SortedQueueEntryList(queue, _propertyName); } - } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java new file mode 100644 index 0000000000..e2ccdc45cf --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.Map; +import java.util.UUID; + +public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList> +{ + public StandardQueue(final UUID id, + final String name, + final boolean durable, + final String owner, + final boolean autoDelete, + final boolean exclusive, + final VirtualHost virtualHost, + final Map<String, Object> arguments) + { + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments); + } +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java new file mode 100644 index 0000000000..368015e9c0 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +public class StandardQueueEntry extends OrderedQueueEntry<StandardQueueEntry, StandardQueue, StandardQueueEntryList> +{ + protected StandardQueueEntry(final StandardQueueEntryList queueEntryList) + { + super(queueEntryList); + } + + public StandardQueueEntry(final StandardQueueEntryList queueEntryList, + final ServerMessage message, + final long entryId) + { + super(queueEntryList, message, entryId); + } + + public StandardQueueEntry(final StandardQueueEntryList queueEntryList, final ServerMessage message) + { + super(queueEntryList, message); + } + + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java new file mode 100644 index 0000000000..11ad04e61c --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.ServerMessage; + +public class StandardQueueEntryList extends OrderedQueueEntryList<StandardQueueEntry, StandardQueue, StandardQueueEntryList> +{ + + private static final HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList> HEAD_CREATOR = new HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList>() + { + @Override + public StandardQueueEntry createHead(final StandardQueueEntryList list) + { + return new StandardQueueEntry(list); + } + }; + + public StandardQueueEntryList(final StandardQueue queue) + { + super(queue, HEAD_CREATOR); + } + + + protected StandardQueueEntry createQueueEntry(ServerMessage<?> message) + { + return new StandardQueueEntry(this, message); + } + + static class Factory implements QueueEntryListFactory<StandardQueueEntry, StandardQueue, StandardQueueEntryList> + { + + public StandardQueueEntryList createQueueEntryList(StandardQueue queue) + { + return new StandardQueueEntryList(queue); + } + } + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index de7369f5ed..eff9bdf433 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -47,7 +47,7 @@ public class DurableConfigurationStoreHelper Queue.EXCLUSIVE, Queue.ALTERNATE_EXCHANGE)); - public static void updateQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) throws AMQStoreException + public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) throws AMQStoreException { Map<String, Object> attributesMap = new LinkedHashMap<String, Object>(); attributesMap.put(Queue.NAME, queue.getName()); @@ -72,7 +72,7 @@ public class DurableConfigurationStoreHelper store.update(queue.getId(), QUEUE, attributesMap); } - public static void createQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) + public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) throws AMQStoreException { Map<String, Object> attributesMap = new HashMap<String, Object>(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index f60f173de9..4dd9831454 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -27,7 +27,7 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; @@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase // Check that atest was a priority queue with 5 priorities AMQQueue atest = vhost.getQueue("atest"); - assertTrue(atest instanceof AMQPriorityQueue); - assertEquals(5, ((AMQPriorityQueue) atest).getPriorities()); + assertTrue(atest instanceof PriorityQueue); + assertEquals(5, ((PriorityQueue) atest).getPriorities()); // Check that ptest was a priority queue with 10 priorities AMQQueue ptest = vhost.getQueue("ptest"); - assertTrue(ptest instanceof AMQPriorityQueue); - assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities()); + assertTrue(ptest instanceof PriorityQueue); + assertEquals(10, ((PriorityQueue) ptest).getPriorities()); // Check that ntest wasn't a priority queue AMQQueue ntest = vhost.getQueue("ntest"); - assertFalse(ntest instanceof AMQPriorityQueue); + assertFalse(ntest instanceof PriorityQueue); } public void testQueueAlerts() throws Exception diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index ea9d0ac693..8cab2e9058 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -74,7 +74,7 @@ public class TopicExchangeTest extends QpidTestCase public void testNoRoute() throws AMQException { - AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -86,7 +86,7 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -109,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -140,7 +140,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -191,7 +191,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -217,7 +217,7 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchAfterHash() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -256,7 +256,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); @@ -278,7 +278,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws AMQException { - AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index bcf54c97a4..febce9ea2e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -231,7 +231,7 @@ public class AMQQueueFactoryTest extends QpidTestCase false, attributes); - assertEquals("Queue not a priority queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); verifyRegisteredQueueCount(1); } @@ -246,7 +246,7 @@ public class AMQQueueFactoryTest extends QpidTestCase false, false, null); - assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); + assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java index d67c70c831..c2291f5eed 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java @@ -28,6 +28,9 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Collections; +import java.util.UUID; + public class ConflationQueueListTest extends TestCase { private static final String CONFLATION_KEY = "CONFLATION_KEY"; @@ -37,13 +40,15 @@ public class ConflationQueueListTest extends TestCase private static final String TEST_KEY_VALUE2 = "testKeyValue2"; private ConflationQueueList _list; - private AMQQueue _queue = createTestQueue(); + private ConflationQueue _queue; @Override protected void setUp() throws Exception { super.setUp(); - _list = new ConflationQueueList(_queue, CONFLATION_KEY); + _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class), + Collections.<String,Object>emptyMap(),CONFLATION_KEY); + _list = _queue.getEntries(); } public void testListHasNoEntries() @@ -175,7 +180,8 @@ public class ConflationQueueListTest extends TestCase private int countEntries(ConflationQueueList list) { - QueueEntryIterator<SimpleQueueEntryImpl> iterator = list.iterator(); + QueueEntryIterator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList,QueueConsumer<?,ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>> iterator = + list.iterator(); int count = 0; while(iterator.advance()) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 66c12717db..d1bc5effc0 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -45,7 +46,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; -public class MockAMQQueue implements AMQQueue<QueueConsumer> +public class MockAMQQueue implements AMQQueue { private boolean _deleted = false; private String _name; @@ -178,6 +179,8 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return null; } + + public boolean isDurable() { return false; @@ -210,32 +213,55 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> } @Override - public QueueConsumer addConsumer(final ConsumerTarget target, + public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException + { + return false; + } + + @Override + public void addQueueDeleteTask(final Action task) + { + + } + + @Override + public void enqueue(final ServerMessage message, final Action action) throws AMQException + { + + } + + @Override + public int compareTo(final Object o) + { + return 0; + } + + @Override + public Consumer addConsumer(final ConsumerTarget target, final FilterManager filters, - final Class<? extends ServerMessage> messageClass, + final Class messageClass, final String consumerName, - final EnumSet<Consumer.Option> options) throws AMQException + final EnumSet options) throws AMQException { return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES), - options.contains(Consumer.Option.SEES_REQUEUES), consumerName, - options.contains(Consumer.Option.TRANSIENT), target ); + options.contains(Consumer.Option.SEES_REQUEUES), consumerName, + options.contains(Consumer.Option.TRANSIENT), target ); } + public String getName() { return _name; } - @Override - public int send(final ServerMessage message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<MessageInstance<? extends Consumer>> postEnqueueAction) + public int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { return 0; } - public Collection<QueueConsumer> getConsumers() { return Collections.emptyList(); @@ -312,38 +338,39 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return getMessageCount(); } - public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException - { - } - public void requeue(QueueEntry entry) { } - public void dequeue(QueueEntry entry, Consumer sub) + public void dequeue(QueueEntry entry) { } - public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException + public boolean resend(QueueEntry entry, QueueConsumer consumer) throws AMQException { return false; } - public void addQueueDeleteTask(Action<AMQQueue> task) + @Override + public void removeQueueDeleteTask(final Action task) { + } - public void removeQueueDeleteTask(final Action<AMQQueue> task) + @Override + public void decrementUnackedMsgCount(final QueueEntry queueEntry) { + } - public List<QueueEntry> getMessagesOnTheQueue() + @Override + public List getMessagesOnTheQueue() { return null; } - public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId) + public List getMessagesOnTheQueue(long fromMessageId, long toMessageId) { return null; } @@ -363,7 +390,7 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return null; } - public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition) + public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition) { return null; } @@ -570,10 +597,6 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> return 0; } - public void decrementUnackedMsgCount(QueueEntry queueEntry) - { - - } public long getUnackedMessageCount() { @@ -617,4 +640,5 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer> { return null; } + } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 89b366567d..386bb46044 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -32,7 +32,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; -public class MockQueueEntry implements QueueEntry +public class MockMessageInstance implements MessageInstance<MockMessageInstance,Consumer> { private ServerMessage _message; @@ -42,13 +42,15 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean acquire(QueueConsumer sub) + @Override + public int getMaximumDeliveryCount() { - return false; + return 0; } @Override - public int getMaximumDeliveryCount() + public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, + final ServerTransaction txn) { return 0; } @@ -58,27 +60,24 @@ public class MockQueueEntry implements QueueEntry return false; } - public boolean isAcquiredBy(QueueConsumer consumer) + @Override + public boolean isAcquiredBy(final Consumer consumer) { return false; } - public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener) - { - - } - public void delete() { } - public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, final ServerTransaction txn) + public boolean expired() throws AMQException { - return 0; + return false; } - public boolean expired() throws AMQException + @Override + public boolean acquire(final Consumer sub) { return false; } @@ -88,7 +87,7 @@ public class MockQueueEntry implements QueueEntry return false; } - public QueueConsumer getDeliveredConsumer() + public Consumer getDeliveredConsumer() { return null; } @@ -103,11 +102,6 @@ public class MockQueueEntry implements QueueEntry return _message; } - public AMQQueue<QueueConsumer> getQueue() - { - return null; - } - public long getSize() { return 0; @@ -118,32 +112,19 @@ public class MockQueueEntry implements QueueEntry return false; } - - public boolean isQueueDeleted() + public void reject() { - - return false; } - - public boolean isRejectedBy(QueueConsumer consumer) + @Override + public boolean isRejectedBy(final Consumer consumer) { - return false; } - public void reject() - { - - - } - - public void release() { - - } @Override @@ -153,26 +134,18 @@ public class MockQueueEntry implements QueueEntry } - public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener) - { - - return false; - } - public void setRedelivered() { - - } public AMQMessageHeader getMessageHeader() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isPersistent() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public boolean isRedelivered() @@ -180,12 +153,6 @@ public class MockQueueEntry implements QueueEntry return false; } - public int compareTo(QueueEntry o) - { - - return 0; - } - public void setMessage(ServerMessage msg) { _message = msg; @@ -195,31 +162,32 @@ public class MockQueueEntry implements QueueEntry { return false; } - - public QueueEntry getNextNode() + @Override + public int getDeliveryCount() { - return null; + return 0; } - public QueueEntry getNextValidEntry() + @Override + public void incrementDeliveryCount() { - return null; } @Override - public int getDeliveryCount() + public void decrementDeliveryCount() { - return 0; } @Override - public void incrementDeliveryCount() + public void addStateChangeListener(final StateChangeListener<? super MockMessageInstance, State> listener) { + } @Override - public void decrementDeliveryCount() + public boolean removeStateChangeListener(final StateChangeListener<? super MockMessageInstance, State> listener) { + return false; } @Override @@ -237,6 +205,6 @@ public class MockQueueEntry implements QueueEntry @Override public TransactionLogResource getOwningResource() { - return getQueue(); + return null; } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index e8c0470915..3db5d0fb62 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -26,12 +26,16 @@ import static org.mockito.Mockito.when; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import java.util.Collections; +import java.util.UUID; + public class PriorityQueueListTest extends QpidTestCase { private static final byte[] PRIORITIES = {4, 5, 5, 4}; - PriorityQueueList _list = new PriorityQueueList(null, 10); + PriorityQueueList _list; private QueueEntry _priority4message1; private QueueEntry _priority4message2; @@ -42,6 +46,17 @@ public class PriorityQueueListTest extends QpidTestCase { QueueEntry[] entries = new QueueEntry[PRIORITIES.length]; + PriorityQueue queue = new PriorityQueue(UUID.randomUUID(), + getName(), + false, + null, + false, + false, + mock(VirtualHost.class), + Collections.<String,Object>emptyMap(), + 10); + _list = queue.getEntries(); + for (int i = 0; i < PRIORITIES.length; i++) { ServerMessage<?> message = mock(ServerMessage.class); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index ced00dc578..56cd29b0bd 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -24,6 +24,8 @@ import java.util.Collections; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -34,9 +36,10 @@ import java.util.EnumSet; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.consumer.Consumer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class AMQPriorityQueueTest extends SimpleAMQQueueTest +public class PriorityQueueTest extends SimpleAMQQueueTestBase { @Override diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 95139d8740..b67b2dda32 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -22,12 +22,18 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.lang.reflect.Field; +import java.util.Collections; +import java.util.UUID; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -57,6 +63,15 @@ public abstract class QueueEntryImplTestBase extends TestCase _queueEntry3 = getQueueEntryImpl(3); } + + protected void mockLogging() + { + final LogActor logActor = mock(LogActor.class); + when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); + CurrentActor.setDefault(logActor); + } + + public void testAcquire() { assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method", @@ -185,7 +200,9 @@ public abstract class QueueEntryImplTestBase extends TestCase { int numberOfEntries = 5; QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries]; - SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, + mock(VirtualHost.class), Collections.<String,Object>emptyMap()); + OrderedQueueEntryList queueEntryList = queue.getEntries(); // create test entries for(int i = 0; i < numberOfEntries ; i++) diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index a1bc9e50c7..3af268c189 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,20 +33,21 @@ import static org.mockito.Mockito.when; /** * Abstract test class for QueueEntryList implementations. */ -public abstract class QueueEntryListTestBase extends TestCase +public abstract class QueueEntryListTestBase<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> extends TestCase { - protected static final AMQQueue _testQueue = new MockAMQQueue("test"); - public abstract QueueEntryList<QueueEntry> getTestList(); - public abstract QueueEntryList<QueueEntry> getTestList(boolean newList); + public abstract L getTestList(); + public abstract L getTestList(boolean newList); public abstract long getExpectedFirstMsgId(); public abstract int getExpectedListLength(); public abstract ServerMessage getTestMessageToAdd() throws AMQException; public void testGetQueue() { - assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue); + assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), getTestQueue()); } + protected abstract Q getTestQueue(); + /** * Test to add a message with properties specific to the queue type. * @see QueueEntryListTestBase#getTestList() @@ -54,10 +56,10 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testAddSpecificMessage() throws AMQException { - final QueueEntryList<QueueEntry> list = getTestList(); + final L list = getTestList(); list.add(getTestMessageToAdd()); - final QueueEntryIterator<?> iter = list.iterator(); + final QueueEntryIterator<E,Q,L,C> iter = list.iterator(); int count = 0; while(iter.advance()) { @@ -75,11 +77,11 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testAddGenericMessage() throws AMQException { - final QueueEntryList<QueueEntry> list = getTestList(); + final L list = getTestList(); final ServerMessage message = createServerMessage(666l); list.add(message); - final QueueEntryIterator<?> iter = list.iterator(); + final QueueEntryIterator<E,Q,L,C> iter = list.iterator(); int count = 0; while(iter.advance()) { @@ -109,8 +111,8 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testListNext() { - final QueueEntryList<QueueEntry> entryList = getTestList(); - QueueEntry entry = entryList.getHead(); + final L entryList = getTestList(); + E entry = entryList.getHead(); int count = 0; while(entryList.next(entry) != null) { @@ -127,7 +129,7 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testIterator() { - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<E,Q,L,C> iter = getTestList().iterator(); int count = 0; while(iter.advance()) { @@ -145,10 +147,10 @@ public abstract class QueueEntryListTestBase extends TestCase public void testDequeuedMessagedNotPresentInIterator() throws Exception { final int numberOfMessages = getExpectedListLength(); - final QueueEntryList<QueueEntry> entryList = getTestList(); + final L entryList = getTestList(); // dequeue all even messages - final QueueEntryIterator<?> it1 = entryList.iterator(); + final QueueEntryIterator<E,Q,L,C> it1 = entryList.iterator(); int counter = 0; while (it1.advance()) { @@ -161,7 +163,7 @@ public abstract class QueueEntryListTestBase extends TestCase } // iterate and check that dequeued messages are not returned by iterator - final QueueEntryIterator<?> it2 = entryList.iterator(); + final QueueEntryIterator<E,Q,L,C> it2 = entryList.iterator(); int counter2 = 0; while(it2.advance()) { @@ -180,7 +182,7 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testGetHead() { - final QueueEntry head = getTestList().getHead(); + final E head = getTestList().getHead(); assertNull("Head entry should not contain an actual message", head.getMessage()); assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head) .getMessage().getMessageNumber()); @@ -192,16 +194,16 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testEntryDeleted() { - final QueueEntry head = getTestList().getHead(); + final E head = getTestList().getHead(); - final QueueEntry first = getTestList().next(head); + final E first = getTestList().next(head); first.delete(); - final QueueEntry second = getTestList().next(head); + final E second = getTestList().next(head); assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second .getMessage().getMessageNumber()); - final QueueEntry third = getTestList().next(first); + final E third = getTestList().next(first); assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second .getMessage().getMessageNumber(), third.getMessage().getMessageNumber()); } @@ -215,11 +217,11 @@ public abstract class QueueEntryListTestBase extends TestCase */ public void testIteratorIgnoresDeletedFinalNode() throws Exception { - QueueEntryList<QueueEntry> list = getTestList(true); + L list = getTestList(true); int i = 0; - QueueEntry queueEntry1 = list.add(createServerMessage(i++)); - QueueEntry queueEntry2 = list.add(createServerMessage(i++)); + E queueEntry1 = list.add(createServerMessage(i++)); + E queueEntry2 = list.add(createServerMessage(i++)); assertSame(queueEntry2, list.next(queueEntry1)); assertNull(list.next(queueEntry2)); @@ -228,7 +230,7 @@ public abstract class QueueEntryListTestBase extends TestCase queueEntry2.delete(); assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted()); - QueueEntryIterator<QueueEntry> iter = list.iterator(); + QueueEntryIterator<E,Q,L,C> iter = list.iterator(); //verify the iterator isn't 'atTail', can advance, and returns the 1st QueueEntry assertFalse("Iterator should not have been 'atTail'", iter.atTail()); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java index 674af36b77..87182dc8dc 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.queue; import junit.framework.Assert; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; +import org.apache.qpid.server.queue.SortedQueueEntry.Colour; /** * Test extension of SortedQueueEntryList that provides data structure validation tests. @@ -30,22 +30,28 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour; */ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList { - public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName) + public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName) { super(queue, propertyName); } + @Override + public SortedQueue getQueue() + { + return super.getQueue(); + } + @Override /** Overridden to automatically check queue properties before and after. */ - public SortedQueueEntryImpl add(final ServerMessage message) + public SortedQueueEntry add(final ServerMessage message) { assertQueueProperties(); //before add - final SortedQueueEntryImpl result = super.add(message); + final SortedQueueEntry result = super.add(message); assertQueueProperties(); //after add return result; } @Override /** Overridden to automatically check queue properties before and after. */ - public void entryDeleted(SortedQueueEntryImpl entry) + public void entryDeleted(SortedQueueEntry entry) { assertQueueProperties(); //before delete super.entryDeleted(entry); @@ -73,7 +79,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList assertTreeIntegrity(getRoot()); } - public void assertTreeIntegrity(final SortedQueueEntryImpl node) + public void assertTreeIntegrity(final SortedQueueEntry node) { if(node == null) { @@ -109,7 +115,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList assertLeavesSameBlackPath(getRoot()); } - public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node) + public int assertLeavesSameBlackPath(final SortedQueueEntry node) { if(node == null) { @@ -133,7 +139,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList assertChildrenOfRedAreBlack(getRoot()); } - public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node) + public void assertChildrenOfRedAreBlack(final SortedQueueEntry node) { if(node == null) { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java index 5abc97cee9..45001bda50 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; -import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; @@ -56,14 +55,13 @@ import org.apache.qpid.test.utils.QpidTestCase; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -public class SimpleAMQQueueTest extends QpidTestCase +abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase { - private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTest.class); + private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class); - private SimpleAMQQueue _queue; + + private Q _queue; private VirtualHost _virtualHost; private String _qname = "qname"; private String _owner = "owner"; @@ -81,7 +79,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, + _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false, false, false, _arguments); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); @@ -107,7 +105,7 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.stop(); try { - _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, + _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, false, _owner, false, false, false, _arguments); assertNull("Queue was created", _queue); @@ -118,24 +116,14 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("name")); } - try - { - _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); - assertNull("Queue was created", _queue); - } - catch (IllegalArgumentException e) - { - assertTrue("Exception was not about missing vhost", - e.getMessage().contains("Host")); - } - - _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), + _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), "differentName", false, _owner, false, false, false, _arguments); assertNotNull("Queue was not created", _queue); } + public void testGetVirtualHost() { assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); @@ -150,11 +138,11 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Queue was not bound to key", _exchange.isBound(_routingKey,_queue)); assertEquals("Exchange binding count", 1, - _queue.getBindings().size()); + _queue.getBindings().size()); assertEquals("Wrong exchange bound", _routingKey, _queue.getBindings().get(0).getBindingKey()); assertEquals("Wrong exchange bound", _exchange, - _queue.getBindings().get(0).getExchange()); + _queue.getBindings().get(0).getExchange()); _exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP); assertFalse("Routing key was still bound", @@ -495,22 +483,6 @@ public class SimpleAMQQueueTest extends QpidTestCase assertNotNull(ex); } - public void testAutoDeleteQueue() throws Exception - { - _queue.stop(); - _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); - _queue.setDeleteOnNoConsumers(true); - - ServerMessage message = createMessage(new Long(25)); - _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - _queue.enqueue(message, null); - _consumer.close(); - assertTrue("Queue was not deleted when consumer was removed", - _queue.isDeleted()); - } public void testResend() throws Exception { @@ -520,12 +492,12 @@ public class SimpleAMQQueueTest extends QpidTestCase _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>() + _queue.enqueue(message, new Action<MessageInstance<?,? extends Consumer>>() { @Override - public void performAction(final MessageInstance<? extends Consumer> object) + public void performAction(final MessageInstance<?,? extends Consumer> object) { - QueueEntry entry = (QueueEntry) object; + QueueEntryImpl entry = (QueueEntryImpl) object; entry.setRedelivered(); try { @@ -612,7 +584,7 @@ public class SimpleAMQQueueTest extends QpidTestCase // Get non-existent 0th QueueEntry & check returned list was empty // (the position parameters in this method are indexed from 1) - List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0); + List<E> entries = _queue.getMessagesRangeOnTheQueue(0, 0); assertTrue(entries.size() == 0); // Check that when 'from' is 0 it is ignored and the range continues from 1 @@ -681,22 +653,12 @@ public class SimpleAMQQueueTest extends QpidTestCase */ public void testProcessQueueWithUniqueSelectors() throws Exception { - TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testQueue", false,"testOwner", - false, false, _virtualHost, factory, null) - { - @Override - public void deliverAsync(QueueConsumer sub) - { - // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new consumers - } - }; + SimpleAMQQueue testQueue = createNonAsyncDeliverQueue(); // retrieve the QueueEntryList the queue creates and insert the test // messages, thus avoiding straight-through delivery attempts during //enqueue() process. - QueueEntryList list = factory.getQueueEntryList(); + QueueEntryList list = testQueue.getEntries(); assertNotNull("QueueEntryList should have been created", list); QueueEntry msg1 = list.add(createMessage(1L)); @@ -748,6 +710,12 @@ public class SimpleAMQQueueTest extends QpidTestCase verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); } + private SimpleAMQQueue createNonAsyncDeliverQueue() + { + TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); + return new NonAsyncDeliverQueue(factory, getVirtualHost()); + } + /** * Tests that dequeued message is not present in the list returned form * {@link SimpleAMQQueue#getMessagesOnTheQueue()} @@ -764,7 +732,7 @@ public class SimpleAMQQueueTest extends QpidTestCase dequeueMessage(_queue, dequeueMessageIndex); // get messages on the queue - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + List<E> entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertEquals(messageNumber - 1, entries.size()); @@ -801,9 +769,9 @@ public class SimpleAMQQueueTest extends QpidTestCase dequeueMessage(_queue, dequeueMessageIndex); // get messages on the queue with filter accepting all available messages - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() + List<E> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter<E>() { - public boolean accept(QueueEntry entry) + public boolean accept(E entry) { return true; } @@ -855,12 +823,12 @@ public class SimpleAMQQueueTest extends QpidTestCase _queue.deleteMessageFromTop(); //get queue entries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + List<E> entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertNotNull("Null is returned from getMessagesOnTheQueue", entries); assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(), - messageNumber - 2, entries.size()); + messageNumber - 2, entries.size()); assertEquals("Expected first entry with id 2", 2l, (entries.get(0).getMessage()).getMessageNumber()); } @@ -891,241 +859,13 @@ public class SimpleAMQQueueTest extends QpidTestCase } // get queue entries - List<QueueEntry> entries = _queue.getMessagesOnTheQueue(); + List<E> entries = _queue.getMessagesOnTheQueue(); // assert queue entries assertNotNull(entries); assertEquals(0, entries.size()); } - /** - * Tests whether dequeued entry is sent to subscriber in result of - * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} - */ - public void testProcessQueueWithDequeuedEntry() - { - // total number of messages to send - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // create queue with overridden method deliverAsync - SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test", - false, "testOwner", false, false, _virtualHost, null) - { - @Override - public void deliverAsync(QueueConsumer sub) - { - // do nothing - } - }; - - // put messages - List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); - - // dequeue message - dequeueMessage(testQueue, dequeueMessageIndex); - - // latch to wait for message receipt - final CountDownLatch latch = new CountDownLatch(messageNumber -1); - - // create a consumer - MockConsumer consumer = new MockConsumer() - { - /** - * Send a message and decrement latch - * @param entry - * @param batch - */ - public void send(MessageInstance entry, boolean batch) throws AMQException - { - super.send(entry, batch); - latch.countDown(); - } - }; - - try - { - // subscribe - testQueue.addConsumer(consumer, - null, - entries.get(0).getMessage().getClass(), - "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - // process queue - testQueue.processQueue(new QueueRunner(testQueue) - { - public void run() - { - // do nothing - } - }); - } - catch (AMQException e) - { - fail("Failure to process queue:" + e.getMessage()); - } - // wait up to 1 minute for message receipt - try - { - latch.await(1, TimeUnit.MINUTES); - } - catch (InterruptedException e1) - { - Thread.currentThread().interrupt(); - } - List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3)); - verifyReceivedMessages(expected, consumer.getMessages()); - } - - /** - * Tests that entry in dequeued state are not enqueued and not delivered to consumer - */ - public void testEnqueueDequeuedEntry() - { - // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test", false, - "testOwner", false, false, _virtualHost, new QueueEntryListFactory() - { - public QueueEntryList createQueueEntryList(AMQQueue queue) - { - /** - * Override SimpleQueueEntryList to create a dequeued - * entries for messages with even id - */ - return new SimpleQueueEntryList(queue) - { - /** - * Entries with even message id are considered - * dequeued! - */ - protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message) - { - return new SimpleQueueEntryImpl(this, message) - { - - public boolean isDeleted() - { - return (message.getMessageNumber() % 2 == 0); - } - - public boolean isAvailable() - { - return !(message.getMessageNumber() % 2 == 0); - } - - @Override - public boolean acquire(QueueConsumer sub) - { - if(message.getMessageNumber() % 2 == 0) - { - return false; - } - else - { - return super.acquire(sub); - } - } - }; - } - }; - } - }, null); - // create a consumer - MockConsumer consumer = new MockConsumer(); - - // register consumer - try - { - queue.addConsumer(consumer, - null, - createMessage(-1l).getClass(), - "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - } - catch (AMQException e) - { - fail("Failure to register consumer:" + e.getMessage()); - } - - // put test messages into a queue - putGivenNumberOfMessages(queue, 4); - - // assert received messages - List<MessageInstance> messages = consumer.getMessages(); - assertEquals("Only 2 messages should be returned", 2, messages.size()); - assertEquals("ID of first message should be 1", 1l, - (messages.get(0).getMessage()).getMessageNumber()); - assertEquals("ID of second message should be 3", 3l, - (messages.get(1).getMessage()).getMessageNumber()); - } - - public void testActiveConsumerCount() throws Exception - { - final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, - "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); - - //verify adding an active consumer increases the count - final MockConsumer consumer1 = new MockConsumer(); - consumer1.setActive(true); - consumer1.setState(ConsumerTarget.State.ACTIVE); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - queue.addConsumer(consumer1, - null, - createMessage(-1l).getClass(), - "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify adding an inactive consumer doesn't increase the count - final MockConsumer consumer2 = new MockConsumer(); - consumer2.setActive(false); - consumer2.setState(ConsumerTarget.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - queue.addConsumer(consumer2, - null, - createMessage(-1l).getClass(), - "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify behaviour in face of expected state changes: - - //verify a consumer going suspended->active increases the count - consumer2.setState(ConsumerTarget.State.ACTIVE); - assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); - - //verify a consumer going active->suspended decreases the count - consumer2.setState(ConsumerTarget.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify a consumer going suspended->closed doesn't change the count - consumer2.setState(ConsumerTarget.State.CLOSED); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify a consumer going active->active doesn't change the count - consumer1.setState(ConsumerTarget.State.ACTIVE); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - consumer1.setState(ConsumerTarget.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - - //verify a consumer going suspended->suspended doesn't change the count - consumer1.setState(ConsumerTarget.State.SUSPENDED); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - - consumer1.setState(ConsumerTarget.State.ACTIVE); - assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); - - //verify a consumer going active->closed decreases the count - consumer1.setState(ConsumerTarget.State.CLOSED); - assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); - - } public void testNotificationFiredOnEnqueue() throws Exception { @@ -1170,12 +910,12 @@ public class SimpleAMQQueueTest extends QpidTestCase * @param messageNumber * number of messages to put into queue */ - private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber) + protected List<E> enqueueGivenNumberOfMessages(Q queue, int messageNumber) { putGivenNumberOfMessages(queue, messageNumber); // make sure that all enqueued messages are on the queue - List<QueueEntry> entries = queue.getMessagesOnTheQueue(); + List<E> entries = queue.getMessagesOnTheQueue(); assertEquals(messageNumber, entries.size()); for (int i = 0; i < messageNumber; i++) { @@ -1196,16 +936,15 @@ public class SimpleAMQQueueTest extends QpidTestCase * @param queue * @param messageNumber */ - private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber) + protected <T extends SimpleAMQQueue> void putGivenNumberOfMessages(T queue, int messageNumber) { for (int i = 0; i < messageNumber; i++) { // Create message - Long messageId = new Long(i); ServerMessage message = null; try { - message = createMessage(messageId); + message = createMessage((long)i); } catch (AMQException e) { @@ -1239,7 +978,7 @@ public class SimpleAMQQueueTest extends QpidTestCase * @param dequeueMessageIndex * entry index to dequeue. */ - private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) + protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) { List<QueueEntry> entries = queue.getMessagesOnTheQueue(); QueueEntry entry = entries.get(dequeueMessageIndex); @@ -1259,7 +998,7 @@ public class SimpleAMQQueueTest extends QpidTestCase return entriesList; } - private void verifyReceivedMessages(List<MessageInstance> expected, + protected void verifyReceivedMessages(List<MessageInstance> expected, List<MessageInstance> delivered) { assertEquals("Consumer did not receive the expected number of messages", @@ -1272,11 +1011,16 @@ public class SimpleAMQQueueTest extends QpidTestCase } } - public SimpleAMQQueue getQueue() + public Q getQueue() { return _queue; } + protected void setQueue(Q queue) + { + _queue = queue; + } + public MockConsumer getConsumer() { return _consumerTarget; @@ -1310,7 +1054,7 @@ public class SimpleAMQQueueTest extends QpidTestCase return message; } - private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>> + private static class EntryListAddingAction implements Action<MessageInstance<?,? extends Consumer>> { private final ArrayList<QueueEntry> _queueEntries; @@ -1319,25 +1063,122 @@ public class SimpleAMQQueueTest extends QpidTestCase _queueEntries = queueEntries; } - public void performAction(MessageInstance<? extends Consumer> entry) + public void performAction(MessageInstance<?,? extends Consumer> entry) { _queueEntries.add((QueueEntry) entry); } } - class TestSimpleQueueEntryListFactory implements QueueEntryListFactory + + public VirtualHost getVirtualHost() { - QueueEntryList _list; + return _virtualHost; + } + + public String getQname() + { + return _qname; + } + + public String getOwner() + { + return _owner; + } + + public String getRoutingKey() + { + return _routingKey; + } + + public DirectExchange getExchange() + { + return _exchange; + } + + public MockConsumer getConsumerTarget() + { + return _consumerTarget; + } + - public QueueEntryList createQueueEntryList(AMQQueue queue) + static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> + { + + @Override + public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue) + { + return new NonAsyncDeliverList(queue); + } + } + + private static class NonAsyncDeliverEntry extends OrderedQueueEntry<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> + { + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList) + { + super(queueEntryList); + } + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, + final ServerMessage message, + final long entryId) + { + super(queueEntryList, message, entryId); + } + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message) + { + super(queueEntryList, message); + } + } + + private static class NonAsyncDeliverList extends OrderedQueueEntryList<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> + { + + private static final HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> HEAD_CREATOR = + new HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>() + { + + @Override + public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list) + { + return new NonAsyncDeliverEntry(list); + } + }; + + public NonAsyncDeliverList(final NonAsyncDeliverQueue queue) + { + super(queue, HEAD_CREATOR); + } + + @Override + protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message) + { + return new NonAsyncDeliverEntry(this,message); + } + } + + + private static class NonAsyncDeliverQueue extends SimpleAMQQueue<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> + { + public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost) { - _list = new SimpleQueueEntryList(queue); - return _list; + super(UUIDGenerator.generateRandomUUID(), + "testQueue", + false, + "testOwner", + false, + false, + vhost, + factory, + null); } - public QueueEntryList getQueueEntryList() + @Override + public void deliverAsync(QueueConsumer sub) { - return _list; + // do nothing, i.e prevent deliveries by the SubFlushRunner + // when registering the new consumers } } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 11ff7ed192..36425761be 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -21,8 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.UUID; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -30,7 +36,20 @@ import static org.mockito.Mockito.when; public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase { - private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + private OrderedQueueEntryList queueEntryList; + + @Override + public void setUp() throws Exception + { + mockLogging(); + + StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null); + + queueEntryList = queue.getEntries(); + + super.setUp(); + } + public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 7add2d4d43..b6b3843ad2 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -21,20 +21,26 @@ package org.apache.qpid.server.queue; import java.util.Collections; import org.apache.qpid.AMQException; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Arrays; +import java.util.UUID; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SortedQueueEntryListTest extends QueueEntryListTestBase +public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> { private static SelfValidatingSortedQueueEntryList _sqel; + public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144", " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81", "195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45", @@ -62,16 +68,30 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase private final static String keysSorted[] = keys.clone(); + private SortedQueue _testQueue; + @Override protected void setUp() throws Exception { + mockLogging(); + + // Create test list + _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + { + + @Override + public SortedQueueEntryList createQueueEntryList(final SortedQueue queue) + { + return new SelfValidatingSortedQueueEntryList(queue, "KEY"); + } + }); + _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries(); + super.setUp(); // Create result array Arrays.sort(keysSorted); - // Create test list - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); // Build test list long messageId = 0L; @@ -83,14 +103,22 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase } + protected void mockLogging() + { + final LogActor logActor = mock(LogActor.class); + when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); + CurrentActor.setDefault(logActor); + } + + @Override - public QueueEntryList getTestList() + public SortedQueueEntryList getTestList() { return getTestList(false); } @Override - public QueueEntryList getTestList(boolean newList) + public SortedQueueEntryList getTestList(boolean newList) { if(newList) { @@ -117,6 +145,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase return generateTestMessage(1, "test value"); } + @Override + protected SortedQueue getTestQueue() + { + return _testQueue; + } + private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException { final ServerMessage message = mock(ServerMessage.class); @@ -138,7 +172,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase super.testIterator(); // Test sorted order of list - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator(); int count = 0; while(iter.advance()) { @@ -147,12 +181,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase } } - private Object getSortedKeyValue(QueueEntryIterator<?> iter) + private Object getSortedKeyValue(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter) { return (iter.getNode()).getMessage().getMessageHeader().getHeader("KEY"); } - private Long getMessageId(QueueEntryIterator<?> iter) + private Long getMessageId(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter) { return (iter.getNode()).getMessage().getMessageNumber(); } @@ -169,7 +203,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel.add(msg); } - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator(); int count=0; while(iter.advance()) { @@ -190,12 +224,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel.add(msg); } - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator(); int count=0; while(iter.advance()) { assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter)); - assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); } + assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); + } } public void testAscendingSortKeys() throws Exception @@ -211,7 +246,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel.add(msg); } - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator(); int count=0; while(iter.advance()) { @@ -234,7 +269,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel.add(msg); } - final QueueEntryIterator<?> iter = getTestList().iterator(); + final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator(); int count=0; while(iter.advance()) { @@ -251,7 +286,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase ServerMessage msg = generateTestMessage(1, "A"); _sqel.add(msg); - SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); msg = generateTestMessage(2, "B"); @@ -271,7 +306,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase ServerMessage msg = generateTestMessage(1, "B"); _sqel.add(msg); - SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); msg = generateTestMessage(2, "A"); @@ -290,7 +325,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase ServerMessage msg = generateTestMessage(1, "A"); _sqel.add(msg); - SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); msg = generateTestMessage(2, "C"); @@ -322,7 +357,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase ServerMessage msg = generateTestMessage(1, "B"); _sqel.add(msg); - SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead()); + SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); msg = generateTestMessage(2, "D"); @@ -362,7 +397,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase validateEntry(entry, "D", 2); } - private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId) + private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId) { assertEquals("Sorted queue entry value is not as expected", expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY")); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index a84dd6c249..a406c1c26f 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -20,21 +20,41 @@ package org.apache.qpid.server.queue; import java.util.Collections; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SortedQueueEntryImplTest extends QueueEntryImplTestBase +public class SortedQueueEntryTest extends QueueEntryImplTestBase { public final static String keys[] = { "CCC", "AAA", "BBB" }; - private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY"); + private SelfValidatingSortedQueueEntryList _queueEntryList; + + @Override + public void setUp() throws Exception + { + mockLogging(); + SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>() + { + + @Override + public SortedQueueEntryList createQueueEntryList(final SortedQueue queue) + { + return new SelfValidatingSortedQueueEntryList(queue, "KEY"); + } + }); + _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries(); + super.setUp(); + } public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException { @@ -48,7 +68,7 @@ public class SortedQueueEntryImplTest extends QueueEntryImplTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); - return queueEntryList.add(message); + return _queueEntryList.add(message); } public void testCompareTo() diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index ae282d5f37..c053957e2a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -23,16 +23,21 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SimpleQueueEntryListTest extends QueueEntryListTestBase +public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList, QueueConsumer<?,StandardQueueEntry, StandardQueue, StandardQueueEntryList>> { - private SimpleQueueEntryList _sqel; + + private StandardQueue _testQueue; + private StandardQueueEntryList _sqel; private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; private String oldScavengeValue = null; @@ -41,7 +46,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase protected void setUp() { oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9"); - _sqel = new SimpleQueueEntryList(_testQueue); + _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class), + Collections.<String,Object>emptyMap()); + + _sqel = _testQueue.getEntries(); for(int i = 1; i <= 100; i++) { final ServerMessage message = mock(ServerMessage.class); @@ -69,17 +77,21 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase } @Override - public QueueEntryList getTestList() + public StandardQueueEntryList getTestList() { return getTestList(false); } @Override - public QueueEntryList getTestList(boolean newList) + public StandardQueueEntryList getTestList(boolean newList) { if(newList) { - return new SimpleQueueEntryList(_testQueue); + StandardQueue queue = + new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class), + Collections.<String, Object>emptyMap()); + + return queue.getEntries(); } else { @@ -107,9 +119,15 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase return msg; } + @Override + protected StandardQueue getTestQueue() + { + return _testQueue; + } + public void testScavenge() throws Exception { - SimpleQueueEntryList sqel = new SimpleQueueEntryList(null); + OrderedQueueEntryList sqel = new StandardQueueEntryList(null); ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>(); @@ -126,7 +144,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase entriesMap.put(i,bleh); } - SimpleQueueEntryImpl head = sqel.getHead(); + OrderedQueueEntry head = sqel.getHead(); //We shall now delete some specific messages mid-queue that will lead to //requiring a scavenge once the requested threshold of 9 deletes is passed @@ -172,10 +190,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase return entry.isDeleted() && !wasDeleted; } - private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId) + private void verifyDeletedButPresentBeforeScavenge(OrderedQueueEntry head, long messageId) { //Use the head to get the initial entry in the queue - SimpleQueueEntryImpl entry = head.getNextNode(); + OrderedQueueEntry entry = head.getNextNode(); for(long i = 1; i < messageId ; i++) { @@ -186,10 +204,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase assertTrue("Entry should have been deleted", entry.isDeleted()); } - private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages) + private void verifyAllDeletedMessagedNotPresent(OrderedQueueEntry head, Map<Integer,QueueEntry> remainingMessages) { //Use the head to get the initial entry in the queue - SimpleQueueEntryImpl entry = head.getNextNode(); + OrderedQueueEntry entry = head.getNextNode(); assertNotNull("Initial entry should not have been null", entry); @@ -211,8 +229,8 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase public void testGettingNextElement() { final int numberOfEntries = 5; - final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries]; - final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test")); + final OrderedQueueEntry[] entries = new OrderedQueueEntry[numberOfEntries]; + final OrderedQueueEntryList queueEntryList = getTestList(true); // create test entries for(int i = 0; i < numberOfEntries; i++) @@ -228,7 +246,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase // test getNext for not acquired entries for(int i = 0; i < numberOfEntries; i++) { - final SimpleQueueEntryImpl next = entries[i].getNextValidEntry(); + final OrderedQueueEntry next = entries[i].getNextValidEntry(); if(i < numberOfEntries - 1) { @@ -248,7 +266,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase entries[2].acquire(); entries[2].delete(); - SimpleQueueEntryImpl next = entries[2].getNextValidEntry(); + OrderedQueueEntry next = entries[2].getNextValidEntry(); assertEquals("expected forth entry", entries[3], next); next = next.getNextValidEntry(); assertEquals("expected fifth entry", entries[4], next); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java new file mode 100644 index 0000000000..081b1d30ad --- /dev/null +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -0,0 +1,363 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.MockConsumer; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList> +{ + + public void testCreationFailsWithNoVhost() + { + try + { + setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments())); + assertNull("Queue was created", getQueue()); + } + catch (IllegalArgumentException e) + { + assertTrue("Exception was not about missing vhost", + e.getMessage().contains("Host")); + } + } + + + public void testAutoDeleteQueue() throws Exception + { + getQueue().stop(); + setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap())); + getQueue().setDeleteOnNoConsumers(true); + + ServerMessage message = createMessage(25l); + QueueConsumer consumer = + getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + getQueue().enqueue(message, null); + consumer.close(); + assertTrue("Queue was not deleted when consumer was removed", + getQueue().isDeleted()); + } + + public void testActiveConsumerCount() throws Exception + { + final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false, + "testOwner", false, false, getVirtualHost(), null); + + //verify adding an active consumer increases the count + final MockConsumer consumer1 = new MockConsumer(); + consumer1.setActive(true); + consumer1.setState(ConsumerTarget.State.ACTIVE); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + queue.addConsumer(consumer1, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify adding an inactive consumer doesn't increase the count + final MockConsumer consumer2 = new MockConsumer(); + consumer2.setActive(false); + consumer2.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + queue.addConsumer(consumer2, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify behaviour in face of expected state changes: + + //verify a consumer going suspended->active increases the count + consumer2.setState(ConsumerTarget.State.ACTIVE); + assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount()); + + //verify a consumer going active->suspended decreases the count + consumer2.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a consumer going suspended->closed doesn't change the count + consumer2.setState(ConsumerTarget.State.CLOSED); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a consumer going active->active doesn't change the count + consumer1.setState(ConsumerTarget.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + consumer1.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + //verify a consumer going suspended->suspended doesn't change the count + consumer1.setState(ConsumerTarget.State.SUSPENDED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + consumer1.setState(ConsumerTarget.State.ACTIVE); + assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount()); + + //verify a consumer going active->closed decreases the count + consumer1.setState(ConsumerTarget.State.CLOSED); + assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount()); + + } + + + /** + * Tests that entry in dequeued state are not enqueued and not delivered to consumer + */ + public void testEnqueueDequeuedEntry() + { + // create a queue where each even entry is considered a dequeued + SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false, + "testOwner", false, false, getVirtualHost(), null); + // create a consumer + MockConsumer consumer = new MockConsumer(); + + // register consumer + try + { + queue.addConsumer(consumer, + null, + createMessage(-1l).getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + } + catch (AMQException e) + { + fail("Failure to register consumer:" + e.getMessage()); + } + + // put test messages into a queue + putGivenNumberOfMessages(queue, 4); + + // assert received messages + List<MessageInstance> messages = consumer.getMessages(); + assertEquals("Only 2 messages should be returned", 2, messages.size()); + assertEquals("ID of first message should be 1", 1l, + (messages.get(0).getMessage()).getMessageNumber()); + assertEquals("ID of second message should be 3", 3l, + (messages.get(1).getMessage()).getMessageNumber()); + } + + /** + * Tests whether dequeued entry is sent to subscriber in result of + * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)} + */ + public void testProcessQueueWithDequeuedEntry() + { + // total number of messages to send + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // create queue with overridden method deliverAsync + StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test", + false, "testOwner", false, false, getVirtualHost(), null) + { + @Override + public void deliverAsync(QueueConsumer sub) + { + // do nothing + } + }; + + // put messages + List<StandardQueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber); + + // dequeue message + dequeueMessage(testQueue, dequeueMessageIndex); + + // latch to wait for message receipt + final CountDownLatch latch = new CountDownLatch(messageNumber -1); + + // create a consumer + MockConsumer consumer = new MockConsumer() + { + /** + * Send a message and decrement latch + * @param entry + * @param batch + */ + public void send(MessageInstance entry, boolean batch) throws AMQException + { + super.send(entry, batch); + latch.countDown(); + } + }; + + try + { + // subscribe + testQueue.addConsumer(consumer, + null, + entries.get(0).getMessage().getClass(), + "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + // process queue + testQueue.processQueue(new QueueRunner(testQueue) + { + public void run() + { + // do nothing + } + }); + } + catch (AMQException e) + { + fail("Failure to process queue:" + e.getMessage()); + } + // wait up to 1 minute for message receipt + try + { + latch.await(1, TimeUnit.MINUTES); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + List<MessageInstance> expected = Arrays.asList((MessageInstance) entries.get(0), entries.get(2), entries.get(3)); + verifyReceivedMessages(expected, consumer.getMessages()); + } + + + private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList> + { + + public DequeuedQueue(final UUID id, + final String queueName, + final boolean durable, + final String owner, + final boolean autoDelete, + final boolean exclusive, + final VirtualHost virtualHost, + final Map<String, Object> arguments) + { + super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments); + } + } + private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList> + { + public DequeuedQueueEntryList createQueueEntryList(DequeuedQueue queue) + { + /** + * Override SimpleQueueEntryList to create a dequeued + * entries for messages with even id + */ + return new DequeuedQueueEntryList(queue); + } + + + } + + private static class DequeuedQueueEntryList extends OrderedQueueEntryList<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList> + { + private static final HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList> HEAD_CREATOR = + new HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList>() + { + + @Override + public DequeuedQueueEntry createHead(final DequeuedQueueEntryList list) + { + return new DequeuedQueueEntry(list); + } + }; + + public DequeuedQueueEntryList(final DequeuedQueue queue) + { + super(queue, HEAD_CREATOR); + } + + /** + * Entries with even message id are considered + * dequeued! + */ + protected DequeuedQueueEntry createQueueEntry(final ServerMessage message) + { + return new DequeuedQueueEntry(this, message); + } + + + } + + private static class DequeuedQueueEntry extends OrderedQueueEntry<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList> + { + + private final ServerMessage _message; + + private DequeuedQueueEntry(final DequeuedQueueEntryList queueEntryList) + { + super(queueEntryList); + _message = null; + } + + public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message) + { + super(list, message); + _message = message; + } + + public boolean isDeleted() + { + return (_message.getMessageNumber() % 2 == 0); + } + + public boolean isAvailable() + { + return !(_message.getMessageNumber() % 2 == 0); + } + + @Override + public boolean acquire(QueueConsumer sub) + { + if(_message.getMessageNumber() % 2 == 0) + { + return false; + } + else + { + return super.acquire(sub); + } + } + } +} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 11b9bbe1b4..993e9ee4a8 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -24,9 +24,9 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.MockQueueEntry; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -385,7 +385,7 @@ public class AutoCommitTransactionTest extends QpidTestCase final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); - queueEntries.add(new MockQueueEntry() + queueEntries.add(new MockMessageInstance() { @Override @@ -395,7 +395,7 @@ public class AutoCommitTransactionTest extends QpidTestCase } @Override - public AMQQueue getQueue() + public TransactionLogResource getOwningResource() { return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 80e794e0ff..bdfdb55c7e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -24,9 +24,9 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.queue.MockQueueEntry; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; import org.apache.qpid.test.utils.QpidTestCase; @@ -609,7 +609,7 @@ public class LocalTransactionTest extends QpidTestCase final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); - queueEntries.add(new MockQueueEntry() + queueEntries.add(new MockMessageInstance() { @Override @@ -619,7 +619,7 @@ public class LocalTransactionTest extends QpidTestCase } @Override - public AMQQueue getQueue() + public TransactionLogResource getOwningResource() { return queue; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index cb1fc2737d..ed1ea01108 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -44,8 +44,8 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -179,9 +179,9 @@ public class BrokerTestHelper return factory.createExchange("amp.direct", "direct", false, false); } - public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException + public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { - SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, + AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, false, false, false, Collections.<String, Object>emptyMap()); return queue; } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 53022c333e..87a02b99c1 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -928,10 +928,10 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } - private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>> + private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<?,C>> { @Override - public void performAction(final MessageInstance<C> entry) + public void performAction(final MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index aa465d373f..162951f9ef 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1189,14 +1189,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>> + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { public ImmediateAction() { } - public void performAction(MessageInstance<C> entry) + public void performAction(MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); @@ -1261,10 +1261,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>> + private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>> { @Override - public void performAction(final MessageInstance<C> entry) + public void performAction(final MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index 281f7345ff..8aa25e8eb5 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; @@ -36,7 +36,7 @@ import java.util.List; public class AcknowledgeTest extends QpidTestCase { private AMQChannel _channel; - private SimpleAMQQueue _queue; + private AMQQueue _queue; private MessageStore _messageStore; private String _queueName; @@ -79,7 +79,7 @@ public class AcknowledgeTest extends QpidTestCase return (InternalTestProtocolSession)_channel.getProtocolSession(); } - private SimpleAMQQueue getQueue() + private AMQQueue getQueue() { return _queue; } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index e895f81c44..0a4bfd13f1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -26,10 +26,8 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -39,7 +37,7 @@ import java.util.List; public class QueueBrowserUsesNoAckTest extends QpidTestCase { private AMQChannel _channel; - private SimpleAMQQueue _queue; + private AMQQueue _queue; private MessageStore _messageStore; private String _queueName; @@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase return (InternalTestProtocolSession)_channel.getProtocolSession(); } - private SimpleAMQQueue getQueue() + private AMQQueue getQueue() { return _queue; } diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java index 4d85d52997..0329379713 100644 --- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java +++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java @@ -19,9 +19,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java index b408ad8ad1..b5a4166b55 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java @@ -64,7 +64,7 @@ public class FieldTableKeyEnumeratorTest extends TestCase } - public void testPropertEnu() + public void testPropertyEnum() { try { diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index f92a133919..de36c6e413 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -47,11 +47,10 @@ import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.ConflationQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.StandardQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -573,9 +572,9 @@ public class MessageStoreTest extends QpidTestCase if (usePriority) { - assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass()); assertEquals("Priority Queue does not have set priorities", - DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); + DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities()); } else if (lastValueQueue) { @@ -584,7 +583,7 @@ public class MessageStoreTest extends QpidTestCase } else { - assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass()); + assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); } assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 2d6943f643..3c15a45203 100644 --- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -26,11 +26,9 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.NotificationCheckTest; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueueTest; +import org.apache.qpid.server.queue.StandardQueue; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -661,7 +659,7 @@ public class QueueManagementTest extends QpidBrokerTestCase final Object messageGroupKey = "test"; final Map<String, Object> arguments = new HashMap<String, Object>(2); arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); - arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueue.SHARED_MSG_GROUP_ARG_VALUE); managedBroker.createNewQueue(queueName, null, true, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); |