summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-08 17:52:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-08 17:52:05 +0000
commitdb96490d3b7b9ea8643b4f9ce21efdbaaa221b39 (patch)
tree14e879ee092551d4315301bba2d582e7817dcb3d
parent977381169b74290411ae3c01f829262cf4c59dba (diff)
downloadqpid-python-db96490d3b7b9ea8643b4f9ce21efdbaaa221b39.tar.gz
Modify queue classes to use generics
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1566069 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java8
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java14
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java38
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java3
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java26
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java27
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java65
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java48
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java14
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java)24
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java198
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java14
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java)20
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java231
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java48
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java72
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java20
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java9
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java51
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java16
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java299
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java220
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java22
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java (renamed from java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java)44
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java110
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java5
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java41
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java45
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java57
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java12
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java16
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java12
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java78
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java)90
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java17
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java)5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java19
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java50
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java22
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java)451
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java21
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java71
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java)26
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java)50
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java363
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java8
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java8
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java6
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java8
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java6
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java6
-rw-r--r--java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java6
67 files changed, 1878 insertions, 1333 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index bc670bd848..600c60bdb3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -428,10 +429,10 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 33c5218b4c..78b9664cd3 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -335,10 +336,10 @@ public class DefaultExchange implements Exchange
return _id;
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 967c629749..62f5b3634b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.message;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -37,8 +38,8 @@ public interface MessageDestination extends MessageNode
* @param postEnqueueAction action to perform on the result of every enqueue (may be null)
* @return the number of queues in which the message was enqueued performed
*/
- int send(ServerMessage message,
+ <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message,
InstanceProperties instanceProperties,
ServerTransaction txn,
- Action<MessageInstance<? extends Consumer>> postEnqueueAction);
+ Action<? super MessageInstance<?,? extends Consumer>> postEnqueueAction);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 97cb66cce4..49969ce3c1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-public interface MessageInstance<C extends Consumer>
+public interface MessageInstance<M extends MessageInstance<M,C>, C extends Consumer>
{
@@ -45,9 +45,9 @@ public interface MessageInstance<C extends Consumer>
void decrementDeliveryCount();
- void addStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
+ void addStateChangeListener(StateChangeListener<? super M,State> listener);
- boolean removeStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<? super M, State> listener);
boolean acquiredByConsumer();
@@ -71,7 +71,7 @@ public interface MessageInstance<C extends Consumer>
int getMaximumDeliveryCount();
- int routeToAlternate(Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn);
+ int routeToAlternate(Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn);
Filterable asFilterable();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 06ff76f103..49b0f2995c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -32,17 +32,17 @@ import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.EnumSet;
-public interface MessageSource<C extends Consumer> extends TransactionLogResource, MessageNode
+public interface MessageSource<C extends Consumer, S extends MessageSource<C,S>> extends TransactionLogResource, MessageNode
{
- C addConsumer(ConsumerTarget target, FilterManager filters,
+ <T extends ConsumerTarget> C addConsumer(T target, FilterManager filters,
Class<? extends ServerMessage> messageClass,
String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
Collection<C> getConsumers();
- void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
+ void addConsumerRegistrationListener(ConsumerRegistrationListener<S> listener);
- void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
+ void removeConsumerRegistrationListener(ConsumerRegistrationListener<S> listener);
AuthorizationHolder getAuthorizationHolder();
@@ -54,10 +54,10 @@ public interface MessageSource<C extends Consumer> extends TransactionLogResourc
boolean isExclusive();
- interface ConsumerRegistrationListener
+ interface ConsumerRegistrationListener<Q extends MessageSource<? extends Consumer,Q>>
{
- void consumerAdded(AMQQueue queue, Consumer consumer);
- void consumerRemoved(AMQQueue queue, Consumer consumer);
+ void consumerAdded(Q source, Consumer consumer);
+ void consumerRemoved(Q queue, Consumer consumer);
}
/**
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index d59b13902b..f3d1c8a665 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -33,6 +33,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
import org.apache.qpid.server.model.Exchange;
@@ -49,8 +50,9 @@ import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.MapValueConverter;
-final class QueueAdapter extends AbstractAdapter implements Queue,
- AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter<Q extends AMQQueue<?,Q,?>> extends AbstractAdapter implements Queue,
+ MessageSource.ConsumerRegistrationListener<Q>,
+ AMQQueue.NotificationListener
{
@SuppressWarnings("serial")
static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -66,10 +68,11 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
put(DESCRIPTION, String.class);
}});
- private final AMQQueue _queue;
+ private final AMQQueue<?,Q,?> _queue;
+
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
- private Map<Consumer, ConsumerAdapter> _consumerAdapters =
+ private final Map<Consumer, ConsumerAdapter> _consumerAdapters =
new HashMap<Consumer, ConsumerAdapter>();
@@ -77,7 +80,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
private QueueStatisticsAdapter _statistics;
private QueueNotificationListener _queueNotificationListener;
- public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue queue)
+ public QueueAdapter(final VirtualHostAdapter virtualHostAdapter, final AMQQueue<?,Q,?> queue)
{
super(queue.getId(), virtualHostAdapter.getTaskExecutor());
_vhost = virtualHostAdapter;
@@ -124,11 +127,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
private void populateConsumers()
{
- Collection<Consumer> actualConsumers = _queue.getConsumers();
+ Collection<? extends Consumer> actualConsumers = _queue.getConsumers();
synchronized (_consumerAdapters)
{
- Iterator<Consumer> iter = _consumerAdapters.keySet().iterator();
for(Consumer consumer : actualConsumers)
{
if(!_consumerAdapters.containsKey(consumer))
@@ -396,9 +398,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
}
else if(LVQ_KEY.equals(name))
{
- if(_queue instanceof ConflationQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof ConflationQueue)
{
- return ((ConflationQueue)_queue).getConflationKey();
+ return ((ConflationQueue)queue).getConflationKey();
}
}
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
@@ -427,22 +430,24 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
}
else if(SORT_KEY.equals(name))
{
- if(_queue instanceof SortedQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof SortedQueue)
{
- return ((SortedQueue)_queue).getSortedPropertyName();
+ return ((SortedQueue)queue).getSortedPropertyName();
}
}
else if(TYPE.equals(name))
{
- if(_queue instanceof SortedQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof SortedQueue)
{
return "sorted";
}
- if(_queue instanceof ConflationQueue)
+ if(queue instanceof ConflationQueue)
{
return "lvq";
}
- if(_queue instanceof AMQPriorityQueue)
+ if(queue instanceof PriorityQueue)
{
return "priority";
}
@@ -486,9 +491,10 @@ final class QueueAdapter extends AbstractAdapter implements Queue,
}
else if(PRIORITIES.equals(name))
{
- if(_queue instanceof AMQPriorityQueue)
+ AMQQueue queue = _queue;
+ if(queue instanceof PriorityQueue)
{
- return ((AMQPriorityQueue)_queue).getPriorities();
+ return ((PriorityQueue)queue).getPriorities();
}
}
return super.getAttribute(name);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 75994f6d81..28fadd4162 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -27,12 +27,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
/**
* Session model interface.
* Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
- * when monitoring the blocking and blocking of queues/sessions in {@link SimpleAMQQueue}.
+ * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
*/
public interface AMQSessionModel extends Comparable<AMQSessionModel>
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 62927edc29..76477a0a9b 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -37,8 +37,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue,
- MessageSource<C>, CapacityChecker, MessageDestination
+public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer>
+ extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination
{
public interface NotificationListener
@@ -87,41 +87,35 @@ public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, E
int getMessageCount();
- int getUndeliveredMessageCount();
-
long getQueueDepth();
- long getReceivedMessageCount();
-
long getOldestMessageArrivalTime();
boolean isDeleted();
int delete() throws AMQException;
- void requeue(QueueEntry entry);
+ void requeue(E entry);
- void dequeue(QueueEntry entry, Consumer sub);
+ void dequeue(E entry);
- void decrementUnackedMsgCount(QueueEntry queueEntry);
+ void decrementUnackedMsgCount(E queueEntry);
- boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException;
+ boolean resend(final E entry, final C consumer) throws AMQException;
void addQueueDeleteTask(Action<AMQQueue> task);
void removeQueueDeleteTask(Action<AMQQueue> task);
- List<QueueEntry> getMessagesOnTheQueue();
-
- List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
+ List<E> getMessagesOnTheQueue();
List<Long> getMessagesOnTheQueue(int num);
List<Long> getMessagesOnTheQueue(int num, int offset);
- QueueEntry getMessageOnTheQueue(long messageId);
+ E getMessageOnTheQueue(long messageId);
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
@@ -132,9 +126,9 @@ public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, E
* @param toPosition
* @return
*/
- public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
+ public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
- void visit(QueueEntryVisitor visitor);
+ void visit(QueueEntryVisitor<E> visitor);
long getMaximumMessageSize();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 38c205bc00..4e0a9048e1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -288,11 +288,11 @@ public class AMQQueueFactory implements QueueFactory
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
+ q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
+ q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
}
q.setDeleteOnNoConsumers(deleteOnNoConsumer);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
index a9b36c1b24..efc82c0ab4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
-public class AssignedConsumerMessageGroupManager implements MessageGroupManager
+public class AssignedConsumerMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L>
{
private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class);
@@ -53,25 +53,18 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
return val;
}
- public QueueConsumer getAssignedConsumer(final QueueEntry entry)
+ public QueueConsumer getAssignedConsumer(final E entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
}
- public boolean acceptMessage(QueueConsumer sub, QueueEntry entry)
+ public boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry)
{
- if(assignMessage(sub, entry))
- {
- return entry.acquire(sub);
- }
- else
- {
- return false;
- }
+ return assignMessage(sub, entry) && entry.acquire(sub);
}
- private boolean assignMessage(QueueConsumer sub, QueueEntry entry)
+ private boolean assignMessage(QueueConsumer sub, E entry)
{
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupVal == null)
@@ -105,16 +98,16 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
}
}
- public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub)
+ public E findEarliestAssignedAvailableEntry(QueueConsumer sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
return visitor.getEntry();
}
- private class EntryFinder implements QueueEntryVisitor
+ private class EntryFinder implements QueueEntryVisitor<E>
{
- private QueueEntry _entry;
+ private E _entry;
private QueueConsumer _sub;
public EntryFinder(final QueueConsumer sub)
@@ -122,7 +115,7 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
_sub = sub;
}
- public boolean visit(final QueueEntry entry)
+ public boolean visit(final E entry)
{
if(!entry.isAvailable())
{
@@ -148,7 +141,7 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
}
}
- public QueueEntry getEntry()
+ public E getEntry()
{
return _entry;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index c1c3bd37e6..31c54dcdd2 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -28,9 +28,9 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
-public interface BaseQueue extends TransactionLogResource
+public interface BaseQueue<C extends Consumer> extends TransactionLogResource
{
- void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException;
+ void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
index c2813bb7a5..a1ff51959c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
@@ -26,7 +26,7 @@ import java.util.UUID;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class ConflationQueue extends SimpleAMQQueue
+public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
protected ConflationQueue(UUID id,
String name,
@@ -42,7 +42,7 @@ public class ConflationQueue extends SimpleAMQQueue
public String getConflationKey()
{
- return ((ConflationQueueList) getEntries()).getConflationKey();
+ return getEntries().getConflationKey();
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 7469e95394..a98a4ac144 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -32,23 +32,38 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
-public class ConflationQueueList extends SimpleQueueEntryList
+public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
+ private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+ {
+
+ @Override
+ public ConflationQueueEntry createHead(final ConflationQueueList list)
+ {
+ return list.createHead();
+ }
+ };
+
private final String _conflationKey;
- private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
- new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap =
+ new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>();
- private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
- private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+ private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
+ private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
+ public ConflationQueueList(ConflationQueue queue, String conflationKey)
{
- super(queue);
+ super(queue, HEAD_CREATOR);
_conflationKey = conflationKey;
}
+ private ConflationQueueEntry createHead()
+ {
+ return new ConflationQueueEntry(this);
+ }
+
public String getConflationKey()
{
return _conflationKey;
@@ -66,7 +81,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
+ final ConflationQueueEntry addedEntry = super.add(message);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
@@ -76,14 +91,14 @@ public class ConflationQueueList extends SimpleQueueEntryList
LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
- final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
- AtomicReference<QueueEntry> entryReferenceFromMap = null;
- QueueEntry entryFromMap;
+ final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
+ AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
+ ConflationQueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
// entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
- boolean keepTryingToUpdateEntryReference = true;
+ boolean keepTryingToUpdateEntryReference;
do
{
do
@@ -139,16 +154,16 @@ public class ConflationQueueList extends SimpleQueueEntryList
* adds and removes during execution of this method.</li>
* </ul>
*/
- private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+ private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue)
{
- AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+ AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
if(latestValueReference == null)
{
latestValueReference = _latestValuesMap.get(key);
if(latestValueReference == null)
{
- return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone);
}
}
return latestValueReference;
@@ -177,12 +192,17 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
}
- private final class ConflationQueueEntry extends SimpleQueueEntryImpl
+ final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
- private AtomicReference<QueueEntry> _latestValueReference;
+ private AtomicReference<ConflationQueueEntry> _latestValueReference;
+
+ private ConflationQueueEntry(final ConflationQueueList queueEntryList)
+ {
+ super(queueEntryList);
+ }
- public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
@@ -206,7 +226,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
}
- public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+ public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
@@ -227,12 +247,12 @@ public class ConflationQueueList extends SimpleQueueEntryList
/**
* Exposed purposes of unit test only.
*/
- Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap()
{
return Collections.unmodifiableMap(_latestValuesMap);
}
- static class Factory implements QueueEntryListFactory
+ static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
{
private final String _conflationKey;
@@ -241,7 +261,8 @@ public class ConflationQueueList extends SimpleQueueEntryList
_conflationKey = conflationKey;
}
- public ConflationQueueList createQueueEntryList(AMQQueue queue)
+ @Override
+ public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
{
return new ConflationQueueList(queue, _conflationKey);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index 4c74e5ba0b..e67591ae07 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -32,22 +32,22 @@ import org.apache.qpid.server.message.ServerMessage;
import java.util.HashMap;
import java.util.Map;
-public class DefinedGroupMessageGroupManager implements MessageGroupManager
+public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements MessageGroupManager<E,Q,L>
{
private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
private final String _groupId;
private final String _defaultGroup;
private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
- private final ConsumerResetHelper _resetHelper;
+ private final ConsumerResetHelper<E,Q,L> _resetHelper;
private final class Group
{
private final Object _group;
- private QueueConsumer _consumer;
+ private QueueConsumer<?,E,Q,L> _consumer;
private int _activeCount;
- private Group(final Object key, final QueueConsumer consumer)
+ private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer)
{
_group = key;
_consumer = consumer;
@@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
}
- public QueueConsumer getConsumer()
+ public QueueConsumer<?,E,Q,L> getConsumer()
{
return _consumer;
}
@@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper)
{
_groupId = groupId;
_defaultGroup = defaultGroup;
_resetHelper = resetHelper;
}
- public synchronized QueueConsumer getAssignedConsumer(final QueueEntry entry)
+ public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry)
{
Object groupId = getKey(entry);
@@ -135,19 +135,12 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return group == null || !group.isValid() ? null : group.getConsumer();
}
- public synchronized boolean acceptMessage(final QueueConsumer sub, final QueueEntry entry)
+ public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- if(assignMessage(sub, entry))
- {
- return entry.acquire(sub);
- }
- else
- {
- return false;
- }
+ return assignMessage(sub, entry) && entry.acquire(sub);
}
- private boolean assignMessage(final QueueConsumer sub, final QueueEntry entry)
+ private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
@@ -171,7 +164,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
if(assignedSub == sub)
{
- entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
+ entry.addStateChangeListener(new GroupStateChangeListener(group));
return true;
}
else
@@ -180,16 +173,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer sub)
+ public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
{
EntryFinder visitor = new EntryFinder(sub);
sub.getQueue().visit(visitor);
return visitor.getEntry();
}
- private class EntryFinder implements QueueEntryVisitor
+ private class EntryFinder implements QueueEntryVisitor<E>
{
- private QueueEntry _entry;
+ private E _entry;
private QueueConsumer _sub;
public EntryFinder(final QueueConsumer sub)
@@ -197,7 +190,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
_sub = sub;
}
- public boolean visit(final QueueEntry entry)
+ public boolean visit(final E entry)
{
if(!entry.isAvailable())
{
@@ -218,7 +211,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
}
}
- public QueueEntry getEntry()
+ public E getEntry()
{
return _entry;
}
@@ -229,7 +222,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
}
- private Object getKey(QueueEntry entry)
+ private Object getKey(E entry)
{
ServerMessage message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
@@ -241,17 +234,16 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State>
{
private final Group _group;
- public GroupStateChangeListener(final Group group,
- final MessageInstance<QueueConsumer> entry)
+ public GroupStateChangeListener(final Group group)
{
_group = group;
}
- public void stateChanged(final MessageInstance<QueueConsumer> entry,
+ public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry,
final MessageInstance.State oldState,
final MessageInstance.State newState)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
index 740a96bf2d..ba9dbc8a70 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.queue;
-public interface MessageGroupManager
+public interface MessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- public interface ConsumerResetHelper
+ public interface ConsumerResetHelper<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments);
+ public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments);
- boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub);
+ boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub);
}
- QueueConsumer getAssignedConsumer(QueueEntry entry);
+ QueueConsumer getAssignedConsumer(E entry);
- boolean acceptMessage(QueueConsumer sub, QueueEntry entry);
+ boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry);
- QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer sub);
+ E findEarliestAssignedAvailableEntry(QueueConsumer<?,E,Q,L> sub);
void clearAssignments(QueueConsumer sub);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
index 251a1f55ed..369f42b183 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
@@ -24,46 +24,46 @@ import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public class SimpleQueueEntryImpl extends QueueEntryImpl
+public abstract class OrderedQueueEntry<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> extends QueueEntryImpl<E,Q,L>
{
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryImpl.class, SimpleQueueEntryImpl.class, "_next");
+ (OrderedQueueEntry.class, OrderedQueueEntry.class, "_next");
- private volatile SimpleQueueEntryImpl _next;
+ private volatile E _next;
- public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList)
+ public OrderedQueueEntry(L queueEntryList)
{
super(queueEntryList);
}
- public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public OrderedQueueEntry(L queueEntryList, ServerMessage message, final long entryId)
{
super(queueEntryList, message, entryId);
}
- public SimpleQueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ public OrderedQueueEntry(L queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
- public SimpleQueueEntryImpl getNextNode()
+ public E getNextNode()
{
return _next;
}
- public SimpleQueueEntryImpl getNextValidEntry()
+ public E getNextValidEntry()
{
- SimpleQueueEntryImpl next = getNextNode();
+ E next = getNextNode();
while(next != null && next.isDeleted())
{
- final SimpleQueueEntryImpl newNext = next.getNextNode();
+ final E newNext = next.getNextNode();
if(newNext != null)
{
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
next = getNextNode();
}
else
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
new file mode 100644
index 0000000000..f2491bdb0c
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
@@ -0,0 +1,198 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+public abstract class OrderedQueueEntryList<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements SimpleQueueEntryList<E,Q,L>
+{
+
+ private final E _head;
+
+ private volatile E _tail;
+
+ static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry>
+ _tailUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail");
+
+
+ private final Q _queue;
+
+ static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
+ _nextUpdater = OrderedQueueEntry._nextUpdater;
+
+ private AtomicLong _scavenges = new AtomicLong(0L);
+ private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final AtomicReference<E> _unscavengedHWM = new AtomicReference<E>();
+
+
+ public OrderedQueueEntryList(Q queue, HeadCreator<E,Q,L> headCreator)
+ {
+ _queue = queue;
+ _head = headCreator.createHead((L)this);
+ _tail = _head;
+ }
+
+ void scavenge()
+ {
+ E hwm = _unscavengedHWM.getAndSet(null);
+ E next = _head.getNextValidEntry();
+
+ if(hwm != null)
+ {
+ while (next != null && hwm.compareTo(next)>0)
+ {
+ next = next.getNextValidEntry();
+ }
+ }
+ }
+
+
+ public Q getQueue()
+ {
+ return _queue;
+ }
+
+
+ public E add(ServerMessage message)
+ {
+ E node = createQueueEntry(message);
+ for (;;)
+ {
+ OrderedQueueEntry tail = _tail;
+ OrderedQueueEntry next = tail.getNextNode();
+ if (tail == _tail)
+ {
+ if (next == null)
+ {
+ node.setEntryId(tail.getEntryId()+1);
+ if (_nextUpdater.compareAndSet(tail, null, node))
+ {
+ _tailUpdater.compareAndSet(this, tail, node);
+
+ return node;
+ }
+ }
+ else
+ {
+ _tailUpdater.compareAndSet(this,tail, next);
+ }
+ }
+ }
+ }
+
+ abstract protected E createQueueEntry(ServerMessage<?> message);
+
+ public E next(E node)
+ {
+ return node.getNextValidEntry();
+ }
+
+ public static interface HeadCreator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
+ {
+ E createHead(L list);
+ }
+
+ public static class QueueEntryIteratorImpl<E extends OrderedQueueEntry<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>>
+ {
+ private E _lastNode;
+
+ QueueEntryIteratorImpl(E startNode)
+ {
+ _lastNode = startNode;
+ }
+
+ public boolean atTail()
+ {
+ return _lastNode.getNextValidEntry() == null;
+ }
+
+ public E getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ E nextValidNode = _lastNode.getNextValidEntry();
+
+ if(nextValidNode != null)
+ {
+ _lastNode = nextValidNode;
+ }
+
+ return nextValidNode != null;
+ }
+ }
+
+ public QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> iterator()
+ {
+ return new QueueEntryIteratorImpl<E,Q,L>(_head);
+ }
+
+
+ public E getHead()
+ {
+ return _head;
+ }
+
+ public void entryDeleted(E queueEntry)
+ {
+ E next = _head.getNextNode();
+ E newNext = _head.getNextValidEntry();
+
+ // the head of the queue has not been deleted, hence the deletion must have been mid queue.
+ if (next == newNext)
+ {
+ E unscavengedHWM = _unscavengedHWM.get();
+ while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+ unscavengedHWM = _unscavengedHWM.get();
+ }
+ if (_scavenges.incrementAndGet() > _scavengeCount)
+ {
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
+ else
+ {
+ E unscavengedHWM = _unscavengedHWM.get();
+ if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, null);
+ }
+ }
+ }
+
+ public int getPriorities()
+ {
+ return 0;
+ }
+
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 6918ae683c..7cf245c8f8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -25,30 +25,30 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-public abstract class OutOfOrderQueue extends SimpleAMQQueue
+public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L>
{
protected OutOfOrderQueue(UUID id, String name, boolean durable,
String owner, boolean autoDelete, boolean exclusive,
- VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments)
{
super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
}
@Override
- protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final E entry)
{
// check that all consumers are not in advance of the entry
- QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
- final QueueConsumer consumer = subIter.getNode().getConsumer();
+ final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer();
if(!consumer.isClosed())
{
- QueueContext context = consumer.getQueueContext();
+ QueueContext<E,Q,L> context = consumer.getQueueContext();
if(context != null)
{
- QueueEntry released = context.getReleasedEntry();
+ E released = context.getReleasedEntry();
while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index 46c2a635b7..4440d045d1 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -25,22 +25,22 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-public class AMQPriorityQueue extends OutOfOrderQueue
+public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- protected AMQPriorityQueue(UUID id,
- final String name,
- final boolean durable,
- final String owner,
- final boolean autoDelete,
- boolean exclusive,
- final VirtualHost virtualHost,
- Map<String, Object> arguments, int priorities)
+ protected PriorityQueue(UUID id,
+ final String name,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ boolean exclusive,
+ final VirtualHost virtualHost,
+ Map<String, Object> arguments, int priorities)
{
super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
}
public int getPriorities()
{
- return ((PriorityQueueList) getEntries()).getPriorities();
+ return getEntries().getPriorities();
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 05d84327d4..e877983643 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -22,132 +22,162 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
+abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- private final AMQQueue<QueueConsumer> _queue;
- private final PriorityQueueEntrySubList[] _priorityLists;
- private final int _priorities;
- private final int _priorityOffset;
- public PriorityQueueList(AMQQueue queue, int priorities)
- {
- _queue = queue;
- _priorityLists = new PriorityQueueEntrySubList[priorities];
- _priorities = priorities;
- _priorityOffset = 5-((priorities + 1)/2);
- for(int i = 0; i < priorities; i++)
- {
- _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
- }
- }
- public int getPriorities()
+ public PriorityQueueList(final PriorityQueue queue,
+ final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator)
{
- return _priorities;
+ super(queue, headCreator);
}
- public AMQQueue<QueueConsumer> getQueue()
+ static class PriorityQueueMasterList extends PriorityQueueList
{
- return _queue;
- }
+ private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR =
+ new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ {
+ @Override
+ public PriorityQueueEntry createHead(final PriorityQueueList list)
+ {
+ return null;
+ }
+ };
+ private final PriorityQueue _queue;
+ private final PriorityQueueEntrySubList[] _priorityLists;
+ private final int _priorities;
+ private final int _priorityOffset;
- public SimpleQueueEntryImpl add(ServerMessage message)
- {
- int index = message.getMessageHeader().getPriority() - _priorityOffset;
- if(index >= _priorities)
+ public PriorityQueueMasterList(PriorityQueue queue, int priorities)
{
- index = _priorities-1;
+ super(queue, DUMMY_HEAD_CREATOR);
+ _queue = queue;
+ _priorityLists = new PriorityQueueEntrySubList[priorities];
+ _priorities = priorities;
+ _priorityOffset = 5-((priorities + 1)/2);
+ for(int i = 0; i < priorities; i++)
+ {
+ _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
+ }
}
- else if(index < 0)
+
+ public int getPriorities()
{
- index = 0;
+ return _priorities;
}
- return _priorityLists[index].add(message);
-
- }
-
- public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
- {
- SimpleQueueEntryImpl next = node.getNextValidEntry();
- if(next == null)
+ public PriorityQueue getQueue()
{
- final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
- int index;
- for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {};
+ return _queue;
+ }
- while(next == null && index != 0)
+ public PriorityQueueEntry add(ServerMessage message)
+ {
+ int index = message.getMessageHeader().getPriority() - _priorityOffset;
+ if(index >= _priorities)
+ {
+ index = _priorities-1;
+ }
+ else if(index < 0)
{
- index--;
- next = _priorityLists[index].getHead().getNextValidEntry();
+ index = 0;
}
+ return _priorityLists[index].add(message);
}
- return next;
- }
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl>
- {
- private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ];
- private SimpleQueueEntryImpl _lastNode;
+ @Override
+ protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message)
+ {
+ throw new UnsupportedOperationException();
+ }
- PriorityQueueEntryListIterator()
+ public PriorityQueueEntry next(PriorityQueueEntry node)
{
- for(int i = 0; i < _priorityLists.length; i++)
+ PriorityQueueEntry next = node.getNextValidEntry();
+
+ if(next == null)
{
- _iterators[i] = _priorityLists[i].iterator();
+ final PriorityQueueList nodeEntryList = node.getQueueEntryList();
+ int index;
+ for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--)
+ {
+ // do nothing loop is just to find the index
+ }
+
+ while(next == null && index != 0)
+ {
+ index--;
+ next = _priorityLists[index].getHead().getNextValidEntry();
+ }
+
}
- _lastNode = _iterators[_iterators.length - 1].getNode();
+ return next;
}
-
- public boolean atTail()
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>
{
- for(int i = 0; i < _iterators.length; i++)
+ private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private PriorityQueueEntry _lastNode;
+
+ PriorityQueueEntryListIterator()
{
- if(!_iterators[i].atTail())
+ for(int i = 0; i < _priorityLists.length; i++)
{
- return false;
+ _iterators[i] = _priorityLists[i].iterator();
}
+ _lastNode = _iterators[_iterators.length - 1].getNode();
}
- return true;
- }
- public SimpleQueueEntryImpl getNode()
- {
- return _lastNode;
- }
- public boolean advance()
- {
- for(int i = _iterators.length-1; i >= 0; i--)
+ public boolean atTail()
{
- if(_iterators[i].advance())
+ for(int i = 0; i < _iterators.length; i++)
{
- _lastNode = _iterators[i].getNode();
- return true;
+ if(!_iterators[i].atTail())
+ {
+ return false;
+ }
}
+ return true;
+ }
+
+ public PriorityQueueEntry getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ for(int i = _iterators.length-1; i >= 0; i--)
+ {
+ if(_iterators[i].advance())
+ {
+ _lastNode = _iterators[i].getNode();
+ return true;
+ }
+ }
+ return false;
}
- return false;
}
- }
- public PriorityQueueEntryListIterator iterator()
- {
- return new PriorityQueueEntryListIterator();
- }
+ public PriorityQueueEntryListIterator iterator()
+ {
- public SimpleQueueEntryImpl getHead()
- {
- return _priorityLists[_priorities-1].getHead();
- }
+ return new PriorityQueueEntryListIterator();
+ }
- public void entryDeleted(final SimpleQueueEntryImpl queueEntry)
- {
+ public PriorityQueueEntry getHead()
+ {
+ return _priorityLists[_priorities-1].getHead();
+ }
- }
+ public void entryDeleted(final PriorityQueueEntry queueEntry)
+ {
- static class Factory implements QueueEntryListFactory
+ }
+ }
+ static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
private final int _priorities;
@@ -156,26 +186,34 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
_priorities = priorities;
}
- public PriorityQueueList createQueueEntryList(AMQQueue queue)
+ public PriorityQueueList createQueueEntryList(PriorityQueue queue)
{
- return new PriorityQueueList(queue, _priorities);
+ return new PriorityQueueMasterList(queue, _priorities);
}
}
- private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+ static class PriorityQueueEntrySubList extends PriorityQueueList
{
+ private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ {
+ @Override
+ public PriorityQueueEntry createHead(final PriorityQueueList list)
+ {
+ return new PriorityQueueEntry(list);
+ }
+ };
private int _listPriority;
- public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority)
+ public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority)
{
- super(queue);
+ super(queue, HEAD_CREATOR);
_listPriority = listPriority;
}
@Override
- protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+ protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message)
{
- return new PriorityQueueEntryImpl(this, message);
+ return new PriorityQueueEntry(this, message);
}
public int getListPriority()
@@ -184,17 +222,22 @@ public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
}
}
- private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+ static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+ private PriorityQueueEntry(final PriorityQueueList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
{
super(queueEntryList, message);
}
@Override
- public int compareTo(final QueueEntry o)
+ public int compareTo(final PriorityQueueEntry o)
{
- PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList();
+ PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList();
int otherPriority = pqel.getListPriority();
int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index ff7840255a..5c891797f5 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -47,7 +47,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-class QueueConsumer<T extends ConsumerTarget> implements Consumer
+class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer
{
public static enum State
@@ -61,10 +61,9 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _id;
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
- private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this);
+ private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
private final boolean _acquires;
private final boolean _seesRequeues;
private final String _consumerName;
@@ -74,8 +73,10 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
private final FilterManager _filters;
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
- private SimpleAMQQueue _queue;
- private GenericActor _logActor;
+ private Q _queue;
+ private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ + "(UNKNOWN)"
+ + "] ");
static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
@@ -89,10 +90,10 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
private final T _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
- private volatile QueueContext _queueContext;
- private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>()
+ private volatile QueueContext<E,Q,L> _queueContext;
+ private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>()
{
- public void stateChanged(Consumer sub, State oldState, State newState)
+ public void stateChanged(QueueConsumer sub, State oldState, State newState)
{
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
@@ -158,8 +159,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
throw new RuntimeException(e);
}
}
- final StateChangeListener<Consumer, State> stateListener =
- (StateChangeListener<Consumer, State>) getStateListener();
+ final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
@@ -251,12 +251,12 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return STATE_MAP.get(_target.getState());
}
- public final SimpleAMQQueue getQueue()
+ public final Q getQueue()
{
return _queue;
}
- final void setQueue(SimpleAMQQueue queue, boolean exclusive)
+ final void setQueue(Q queue, boolean exclusive)
{
if(getQueue() != null)
{
@@ -300,9 +300,9 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
getQueue().flushConsumer(this);
}
- boolean resend(final MessageInstance entry) throws AMQException
+ boolean resend(final E entry) throws AMQException
{
- return getQueue().resend((QueueEntry)entry, this);
+ return getQueue().resend(entry, this);
}
final SubFlushRunner getRunner()
@@ -315,31 +315,26 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return _id;
}
- public final StateChangeListener<? extends Consumer, State> getStateListener()
+ public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener()
{
return _stateListener;
}
- public final void setStateListener(StateChangeListener<? extends Consumer, State> listener)
+ public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener)
{
_stateListener = listener;
}
- final QueueContext getQueueContext()
+ final QueueContext<E,Q,L> getQueueContext()
{
return _queueContext;
}
- final void setQueueContext(QueueContext queueContext)
+ final void setQueueContext(QueueContext<E,Q,L> queueContext)
{
_queueContext = queueContext;
}
- protected boolean updateState(State from, State to)
- {
- return _state.compareAndSet(from, to);
- }
-
public final boolean isActive()
{
return getState() == State.ACTIVE;
@@ -355,7 +350,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
_noLocal = noLocal;
}
- public final boolean hasInterest(MessageInstance entry)
+ public final boolean hasInterest(E entry)
{
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
@@ -405,7 +400,6 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
filterLogString.append(delimiter);
}
filterLogString.append("Browser");
- hasEntries = true;
}
return filterLogString.toString();
@@ -431,7 +425,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return _createTime;
}
- final MessageInstance.ConsumerAcquiredState getOwningState()
+ final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
{
return _owningState;
}
@@ -466,7 +460,7 @@ class QueueConsumer<T extends ConsumerTarget> implements Consumer
return _deliveredCount.longValue();
}
- final void send(final QueueEntry entry, final boolean batch) throws AMQException
+ final void send(final E entry, final boolean batch) throws AMQException
{
_deliveredCount.incrementAndGet();
ServerMessage message = entry.getMessage();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
index 82e9d58cf3..2cf0e93e9c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
@@ -24,19 +24,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class QueueConsumerList
+class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private final ConsumerNode _head = new ConsumerNode();
+ private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>();
- private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
- private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
+ private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
+ private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
private final AtomicInteger _size = new AtomicInteger();
- public static final class ConsumerNode
+ public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
private final AtomicBoolean _deleted = new AtomicBoolean();
- private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
- private final QueueConsumer _sub;
+ private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>();
+ private final QueueConsumer<?,E,Q,L> _sub;
public ConsumerNode()
{
@@ -45,7 +45,7 @@ class QueueConsumerList
_deleted.set(true);
}
- public ConsumerNode(final QueueConsumer sub)
+ public ConsumerNode(final QueueConsumer<?,E,Q,L> sub)
{
//used for regular node construction
_sub = sub;
@@ -57,12 +57,12 @@ class QueueConsumerList
*
* @return the next non-deleted node, or null if none was found.
*/
- public ConsumerNode findNext()
+ public ConsumerNode<E,Q,L> findNext()
{
- ConsumerNode next = nextNode();
+ ConsumerNode<E,Q,L> next = nextNode();
while(next != null && next.isDeleted())
{
- final ConsumerNode newNext = next.nextNode();
+ final ConsumerNode<E,Q,L> newNext = next.nextNode();
if(newNext != null)
{
//try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class QueueConsumerList
*
* @return the immediately next node in the structure, or null if at the tail.
*/
- protected ConsumerNode nextNode()
+ protected ConsumerNode<E,Q,L> nextNode()
{
return _next.get();
}
@@ -97,7 +97,7 @@ class QueueConsumerList
* @param node the ConsumerNode to set as 'next'
* @return whether the operation succeeded
*/
- private boolean setNext(final ConsumerNode node)
+ private boolean setNext(final ConsumerNode<E,Q,L> node)
{
return _next.compareAndSet(null, node);
}
@@ -112,18 +112,18 @@ class QueueConsumerList
return _deleted.compareAndSet(false,true);
}
- public QueueConsumer getConsumer()
+ public QueueConsumer<?,E,Q,L> getConsumer()
{
return _sub;
}
}
- private void insert(final ConsumerNode node, final boolean count)
+ private void insert(final ConsumerNode<E,Q,L> node, final boolean count)
{
for (;;)
{
- ConsumerNode tail = _tail.get();
- ConsumerNode next = tail.nextNode();
+ ConsumerNode<E,Q,L> tail = _tail.get();
+ ConsumerNode<E,Q,L> next = tail.nextNode();
if (tail == _tail.get())
{
if (next == null)
@@ -146,16 +146,16 @@ class QueueConsumerList
}
}
- public void add(final QueueConsumer sub)
+ public void add(final QueueConsumer<?,E,Q,L> sub)
{
- ConsumerNode node = new ConsumerNode(sub);
+ ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub);
insert(node, true);
}
- public boolean remove(final QueueConsumer sub)
+ public boolean remove(final QueueConsumer<?, E,Q,L> sub)
{
- ConsumerNode prevNode = _head;
- ConsumerNode node = _head.nextNode();
+ ConsumerNode<E,Q,L> prevNode = _head;
+ ConsumerNode<E,Q,L> node = _head.nextNode();
while(node != null)
{
@@ -170,7 +170,7 @@ class QueueConsumerList
//correctness reasons, however we have just 'deleted'
//the tail. Inserting an empty dummy node after it will
//let us scavenge the node containing the Consumer.
- insert(new ConsumerNode(), false);
+ insert(new ConsumerNode<E,Q,L>(), false);
}
//advance the next node reference in the 'prevNode' to scavenge
@@ -189,9 +189,9 @@ class QueueConsumerList
return false;
}
- private void nodeMarkerCleanup(final ConsumerNode node)
+ private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node)
{
- ConsumerNode markedNode = _subNodeMarker.get();
+ ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get();
if(node == markedNode)
{
//if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class QueueConsumerList
//into the list and find the next node to use.
//Because we inserted a dummy if node was the
//tail, markedNode.nextNode() can never be null.
- ConsumerNode dummy = new ConsumerNode();
+ ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>();
dummy.setNext(markedNode.nextNode());
//if the CAS fails the marked node has changed, thus
@@ -219,7 +219,7 @@ class QueueConsumerList
}
}
- public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
+ public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode)
{
return _subNodeMarker.compareAndSet(expected, nextNode);
}
@@ -231,41 +231,41 @@ class QueueConsumerList
*
* @return the previously marked node (or a dummy if it was subsequently deleted)
*/
- public ConsumerNode getMarkedNode()
+ public ConsumerNode<E,Q,L> getMarkedNode()
{
return _subNodeMarker.get();
}
- public static class ConsumerNodeIterator
+ public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private ConsumerNode _lastNode;
+ private ConsumerNode<E,Q,L> _lastNode;
- ConsumerNodeIterator(ConsumerNode startNode)
+ ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode)
{
_lastNode = startNode;
}
- public ConsumerNode getNode()
+ public ConsumerNode<E,Q,L> getNode()
{
return _lastNode;
}
public boolean advance()
{
- ConsumerNode nextNode = _lastNode.findNext();
+ ConsumerNode<E,Q,L> nextNode = _lastNode.findNext();
_lastNode = nextNode;
return _lastNode != null;
}
}
- public ConsumerNodeIterator iterator()
+ public ConsumerNodeIterator<E,Q,L> iterator()
{
- return new ConsumerNodeIterator(_head);
+ return new ConsumerNodeIterator<E,Q,L>(_head);
}
- public ConsumerNode getHead()
+ public ConsumerNode<E,Q,L> getHead()
{
return _head;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
index 861bd3dea1..7a59c154b6 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-final class QueueContext
+final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- private volatile QueueEntry _lastSeenEntry;
- private volatile QueueEntry _releasedEntry;
+ private volatile E _lastSeenEntry;
+ private volatile E _releasedEntry;
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_lastSeenUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
_releasedUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_releasedEntry");
+ (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
- public QueueContext(QueueEntry head)
+ public QueueContext(E head)
{
_lastSeenEntry = head;
}
- public QueueEntry getLastSeenEntry()
+ public E getLastSeenEntry()
{
return _lastSeenEntry;
}
- QueueEntry getReleasedEntry()
+ E getReleasedEntry()
{
return _releasedEntry;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 6a42088c47..9a49cd6088 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -20,20 +20,21 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
-public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
+public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
{
- AMQQueue<QueueConsumer> getQueue();
+ Q getQueue();
long getSize();
boolean isQueueDeleted();
- QueueEntry getNextNode();
+ E getNextNode();
- QueueEntry getNextValidEntry();
+ E getNextValidEntry();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 8b81a87903..8b6968b2cf 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -44,11 +44,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
{
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final QueueEntryList _queueEntryList;
+ private final L _queueEntryList;
private final MessageReference _message;
@@ -63,7 +63,7 @@ public abstract class QueueEntryImpl implements QueueEntry
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -90,14 +90,14 @@ public abstract class QueueEntryImpl implements QueueEntry
private boolean _deliveredToConsumer;
- public QueueEntryImpl(QueueEntryList<?> queueEntryList)
+ public QueueEntryImpl(L queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -107,7 +107,7 @@ public abstract class QueueEntryImpl implements QueueEntry
populateInstanceProperties();
}
- public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
+ public QueueEntryImpl(L queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -138,7 +138,7 @@ public abstract class QueueEntryImpl implements QueueEntry
return _entryId;
}
- public AMQQueue<QueueConsumer> getQueue()
+ public Q getQueue()
{
return _queueEntryList.getQueue();
}
@@ -234,12 +234,12 @@ public abstract class QueueEntryImpl implements QueueEntry
if(state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ getQueue().decrementUnackedMsgCount((E) this);
}
if(!getQueue().isDeleted())
{
- getQueue().requeue(this);
+ getQueue().requeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -315,13 +315,12 @@ public abstract class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Consumer s = null;
if (state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount(this);
+ getQueue().decrementUnackedMsgCount((E) this);
}
- getQueue().dequeue(this,s);
+ getQueue().dequeue((E)this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -333,9 +332,9 @@ public abstract class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
+ for(StateChangeListener<? super E, State> l : _stateChangeListeners)
{
- l.stateChanged(this, oldState, newState);
+ l.stateChanged((E)this, oldState, newState);
}
}
@@ -345,7 +344,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.entryDeleted(this);
+ _queueEntryList.entryDeleted((E)this);
onDelete();
_message.release();
@@ -364,7 +363,7 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +411,21 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+ public void addStateChangeListener(StateChangeListener<? super E,State> listener)
{
- Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
{
- Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -436,9 +435,9 @@ public abstract class QueueEntryImpl implements QueueEntry
}
- public int compareTo(final QueueEntry o)
+ public int compareTo(final E o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ E other = o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -446,7 +445,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
}
- public QueueEntryList getQueueEntryList()
+ public L getQueueEntryList()
{
return _queueEntryList;
}
@@ -494,10 +493,10 @@ public abstract class QueueEntryImpl implements QueueEntry
@Override
public boolean resend() throws AMQException
{
- QueueConsumer sub = getDeliveredConsumer();
+ QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
if(sub != null)
{
- return sub.resend(this);
+ return sub.resend((E)this);
}
return false;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
index 73ebb0f300..72502feb3d 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryIterator<QE extends QueueEntry>
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
boolean atTail();
- QE getNode();
+ E getNode();
boolean advance();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index ad1f703f51..a49320e9d6 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,21 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList<Q extends QueueEntry>
+public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
{
- AMQQueue<QueueConsumer> getQueue();
+ Q getQueue();
- Q add(ServerMessage message);
+ E add(ServerMessage message);
- Q next(Q node);
+ E next(E node);
- QueueEntryIterator<Q> iterator();
+ QueueEntryIterator<E,Q,L,C> iterator();
- Q getHead();
+ E getHead();
- void entryDeleted(Q queueEntry);
+ void entryDeleted(E queueEntry);
int getPriorities();
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
index 4dbce45f67..ae8b6560be 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-interface QueueEntryListFactory
+interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
{
- public QueueEntryList createQueueEntryList(AMQQueue queue);
+ public L createQueueEntryList(Q queue);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
index 9ecaf6dafd..e6b5ac5611 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
@@ -16,7 +16,9 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryVisitor
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryVisitor<E extends QueueEntry>
{
- boolean visit(QueueEntry entry);
+ boolean visit(E entry);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f4a9794fcd..4450a3ed0c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -52,6 +51,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -59,9 +59,9 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
- StateChangeListener<QueueConsumer, QueueConsumer.State>,
- MessageGroupManager.ConsumerResetHelper
+abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
+ StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
+ MessageGroupManager.ConsumerResetHelper<E,Q,L>
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -94,11 +94,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
private Exchange _alternateExchange;
- private final QueueEntryList<QueueEntry> _entries;
+ private final L _entries;
- private final QueueConsumerList _consumerList = new QueueConsumerList();
+ private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
- private volatile QueueConsumer _exclusiveSubscriber;
+ private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
@@ -163,8 +163,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
private LogSubject _logSubject;
private LogActor _logActor;
- private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
- private boolean _nolocal;
+ private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private boolean _deleteOnNoConsumers;
@@ -177,20 +176,15 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount;
- private final MessageGroupManager _messageGroupManager;
+ private final MessageGroupManager<E,Q,L> _messageGroupManager;
- private final Collection<ConsumerRegistrationListener> _consumerListeners =
- new ArrayList<ConsumerRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener<Q>>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
- public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
- {
- this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
- }
-
protected SimpleAMQQueue(UUID id,
String name,
boolean durable,
@@ -198,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+ QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
{
if (name == null)
@@ -217,7 +211,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
_autoDelete = autoDelete;
_exclusive = exclusive;
_virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList(this);
+ _entries = entryListFactory.createQueueEntryList((Q)this);
_arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
@@ -243,13 +237,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
{
Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+ new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -281,21 +275,20 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
catch (RejectedExecutionException ree)
{
- if (_stopped.get())
- {
- // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
- }
- else
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ if(!_stopped.get())
{
_logger.error("Unexpected rejected execution", ree);
throw ree;
+
}
+
}
}
public void setNoLocal(boolean nolocal)
{
- _nolocal = nolocal;
+ _noLocal = nolocal;
}
public UUID getId()
@@ -384,7 +377,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
@Override
- public synchronized QueueConsumer addConsumer(final ConsumerTarget target,
+ public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
@@ -412,10 +405,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
throw new ExistingConsumerPreventsExclusive();
}
- QueueConsumer consumer = new QueueConsumer(filters, messageClass,
- optionSet.contains(Consumer.Option.ACQUIRES),
- optionSet.contains(Consumer.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
target.consumerAdded(consumer);
@@ -430,21 +423,21 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
if (!isDeleted())
{
- consumer.setQueue(this, exclusive);
- if(_nolocal)
+ consumer.setQueue((Q)this, exclusive);
+ if(_noLocal)
{
- consumer.setNoLocal(_nolocal);
+ consumer.setNoLocal(true);
}
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener listener : _consumerListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.consumerAdded(this, consumer);
+ listener.consumerAdded((Q)this, consumer);
}
}
@@ -466,7 +459,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
if (consumer == null)
{
@@ -494,9 +487,9 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
synchronized (_consumerListeners)
{
- for(ConsumerRegistrationListener listener : _consumerListeners)
+ for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
{
- listener.consumerRemoved(this, consumer);
+ listener.consumerRemoved((Q)this, consumer);
}
}
@@ -519,10 +512,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public Collection<QueueConsumer> getConsumers()
+ public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
{
- List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
- QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
+ List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
while(iter.advance())
{
consumers.add(iter.getNode().getConsumer());
@@ -531,7 +524,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
@@ -539,7 +532,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
{
synchronized (_consumerListeners)
{
@@ -547,9 +540,9 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
+ E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
_messageGroupManager.clearAssignments(consumer);
@@ -557,11 +550,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
if(entry != null)
{
- QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueConsumer sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -599,11 +592,6 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public int getBindingCountHigh()
- {
- return _bindingCountHigh.get();
- }
-
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
@@ -626,7 +614,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+ public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
@@ -634,8 +622,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry;
- final QueueConsumer exclusiveSub = _exclusiveSubscriber;
+ E entry;
+ final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
@@ -645,8 +633,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
- QueueConsumerList.ConsumerNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
if (nextNode == null)
{
nextNode = _consumerList.getHead().findNext();
@@ -682,7 +670,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
else
{
// if consumer at end, and active, offer
- QueueConsumer sub = nextNode.getConsumer();
+ QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -714,7 +702,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
throws AMQException
{
@@ -746,7 +734,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- private boolean assign(final QueueConsumer sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null)
{
@@ -760,7 +748,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
@@ -770,7 +758,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return (assigned == null) || (assigned == sub);
}
- protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final E entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -804,7 +792,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -815,18 +803,18 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
sub.send(entry, batch);
}
- private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if (subContext != null)
{
- QueueEntry releasedEntry = subContext.getReleasedEntry();
+ E releasedEntry = subContext.getReleasedEntry();
QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
@@ -836,13 +824,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
{
- QueueContext subContext = sub.getQueueContext();
+ QueueContext<E,Q,L> subContext = sub.getQueueContext();
if(subContext != null)
{
- QueueEntry oldEntry;
+ E oldEntry;
while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
{
@@ -854,13 +842,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public void requeue(QueueEntry entry)
+ public void requeue(E entry)
{
- QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueConsumer sub = subscriberIter.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -873,7 +861,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public void dequeue(QueueEntry entry, Consumer sub)
+ @Override
+ public void dequeue(E entry)
{
decrementQueueCount();
decrementQueueSize(entry);
@@ -886,7 +875,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- private void decrementQueueSize(final QueueEntry entry)
+ private void decrementQueueSize(final E entry)
{
final ServerMessage message = entry.getMessage();
long size = message.getSize();
@@ -905,7 +894,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
+ public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException
{
/* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
entry to resend and move back the consumer pointer. */
@@ -915,7 +904,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
{
if (!consumer.isClosed())
{
- deliverMessage((QueueConsumer) consumer, entry, false);
+ deliverMessage(consumer, entry, false);
return true;
}
else
@@ -981,11 +970,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
public long getOldestMessageArrivalTime()
{
- QueueEntry entry = getOldestQueueEntry();
+ E entry = getOldestQueueEntry();
return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- protected QueueEntry getOldestQueueEntry()
+ protected E getOldestQueueEntry()
{
return _entries.next(_entries.getHead());
}
@@ -995,13 +984,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return _deleted.get();
}
- public List<QueueEntry> getMessagesOnTheQueue()
+ public List<E> getMessagesOnTheQueue()
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node != null && !node.isDeleted())
{
entryList.add(node);
@@ -1011,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState)
+ public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
@@ -1029,7 +1018,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public int compareTo(final AMQQueue o)
+ public int compareTo(final Q o)
{
return _name.compareTo(o.getName());
}
@@ -1049,7 +1038,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1060,32 +1049,32 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- protected QueueEntryList getEntries()
+ protected L getEntries()
{
return _entries;
}
- protected QueueConsumerList getConsumerList()
+ protected QueueConsumerList<E,Q,L> getConsumerList()
{
return _consumerList;
}
- public static interface QueueEntryFilter
+ public static interface QueueEntryFilter<E extends QueueEntry>
{
- public boolean accept(QueueEntry entry);
+ public boolean accept(E entry);
public boolean filterComplete();
}
- public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+ public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1098,13 +1087,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
});
}
- public QueueEntry getMessageOnTheQueue(final long messageId)
+ public E getMessageOnTheQueue(final long messageId)
{
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private boolean _complete;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
_complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
@@ -1118,13 +1107,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return entries.isEmpty() ? null : entries.get(0);
}
- public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+ public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
{
- ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ ArrayList<E> entryList = new ArrayList<E>();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
@@ -1134,13 +1123,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public void visit(final QueueEntryVisitor visitor)
+ public void visit(final QueueEntryVisitor<E> visitor)
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while(queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if(!node.isDeleted())
{
@@ -1157,17 +1146,17 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
*
* The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
- * @param fromPosition
- * @param toPosition
- * @return
+ * @param fromPosition first message position
+ * @param toPosition last message position
+ * @return list of messages
*/
- public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+ public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
- return getMessagesOnTheQueue(new QueueEntryFilter()
+ return getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
private long position = 0;
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
position++;
return (position >= fromPosition) && (position <= toPosition);
@@ -1196,12 +1185,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
// TODO - now only used by the tests
public void deleteMessageFromTop()
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
boolean noDeletes = true;
while (noDeletes && queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node);
@@ -1224,14 +1213,14 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
throw new AMQSecurityException("Permission denied: queue " + getName());
}
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
if (node.acquire())
{
dequeueEntry(node, txn);
@@ -1248,13 +1237,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return count;
}
- private void dequeueEntry(final QueueEntry node)
+ private void dequeueEntry(final E node)
{
ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
dequeueEntry(node, txn);
}
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+ private void dequeueEntry(final E node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
new ServerTransaction.Action()
@@ -1283,7 +1272,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
// TODO list all thrown exceptions
- public int delete() throws AMQSecurityException, AMQException
+ public int delete() throws AMQException
{
// Check access
if (!_virtualHost.getSecurityManager().authoriseDelete(this))
@@ -1313,10 +1302,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
return entry.acquire();
}
@@ -1330,7 +1319,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- for(final QueueEntry entry : entries)
+ for(final E entry : entries)
{
// TODO log requeues with a post enqueue action
int requeues = entry.routeToAlternate(null, txn);
@@ -1435,7 +1424,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- public void deliverAsync(QueueConsumer sub)
+ public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1449,7 +1438,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
- void flushConsumer(QueueConsumer sub) throws AMQException
+ void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
@@ -1459,7 +1448,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
flushConsumer(sub, Long.MAX_VALUE);
}
- boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1480,8 +1469,8 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
sub.getSendLock();
}
- atTail = attemptDelivery((QueueConsumer)sub, true);
- if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
+ atTail = attemptDelivery(sub, true);
+ if (atTail && getNextAvailableEntry(sub) == null)
{
queueEmpty = true;
}
@@ -1532,12 +1521,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
* Looks up the next node for the consumer and attempts to deliver it.
*
*
- * @param sub
- * @param batch
+ * @param sub the consumer
+ * @param batch true if processing can be batched
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1545,7 +1534,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
if (subActive)
{
- QueueEntry node = getNextAvailableEntry(sub);
+ E node = getNextAvailableEntry(sub);
if (node != null && node.isAvailable())
{
@@ -1582,11 +1571,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
protected void advanceAllConsumers() throws AMQException
{
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
while (consumerNodeIterator.advance())
{
- QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
- QueueConsumer sub = subNode.getConsumer();
+ QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
+ QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1598,16 +1587,16 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+ private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
throws AMQException
{
- QueueContext context = sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry lastSeen = context.getLastSeenEntry();
- QueueEntry releasedNode = context.getReleasedEntry();
+ E lastSeen = context.getLastSeenEntry();
+ E releasedNode = context.getReleasedEntry();
- QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+ E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1639,12 +1628,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
}
}
- public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
+ public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
{
- QueueContext context = sub.getQueueContext();
+ QueueContext<E,Q,L> context = sub.getQueueContext();
if(context != null)
{
- QueueEntry releasedNode = context.getReleasedEntry();
+ E releasedNode = context.getReleasedEntry();
return releasedNode != null && releasedNode.compareTo(entry) < 0;
}
else
@@ -1681,7 +1670,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
*/
public long processQueue(QueueRunner runner) throws AMQException
{
- long stateChangeCount = Long.MIN_VALUE;
+ long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
long rVal = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
@@ -1716,11 +1705,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
boolean allConsumersDone = true;
boolean consumerDone;
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
while (consumerNodeIterator.advance())
{
- QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
+ QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1802,11 +1791,11 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
public void checkMessageStatus() throws AMQException
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
while (queueListIterator.advance())
{
- QueueEntry node = queueListIterator.getNode();
+ E node = queueListIterator.getNode();
// Only process nodes that are not currently deleted and not dequeued
if (!node.isDeleted())
{
@@ -1953,12 +1942,12 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return _notificationChecks;
}
- private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
+ private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
{
- private final QueueConsumer _sub;
+ private final QueueConsumer<?,E,Q,L> _sub;
- public QueueEntryListener(final QueueConsumer sub)
+ public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
{
_sub = sub;
}
@@ -1974,7 +1963,7 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return System.identityHashCode(_sub);
}
- public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2082,13 +2071,13 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return _unackedMsgBytes.get();
}
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
+ public void decrementUnackedMsgCount(E queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(QueueEntry entry)
+ private void incrementUnackedMsgCount(E entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
@@ -2159,10 +2148,10 @@ public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
return (String) _arguments.get(Queue.DESCRIPTION);
}
- public final int send(final ServerMessage message,
+ public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 101771c7cc..b6954e0696 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,205 +1,25 @@
/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.ServerMessage;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl>
+public interface SimpleQueueEntryList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>>
{
-
- private final SimpleQueueEntryImpl _head;
-
- private volatile SimpleQueueEntryImpl _tail;
-
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryList, SimpleQueueEntryImpl>
- _tailUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
-
-
- private final AMQQueue<QueueConsumer> _queue;
-
- static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
- _nextUpdater = SimpleQueueEntryImpl._nextUpdater;
-
- private AtomicLong _scavenges = new AtomicLong(0L);
- private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
- private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
-
-
- public SimpleQueueEntryList(AMQQueue<QueueConsumer> queue)
- {
- _queue = queue;
- _head = new SimpleQueueEntryImpl(this);
- _tail = _head;
- }
-
- void scavenge()
- {
- SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
- SimpleQueueEntryImpl next = _head.getNextValidEntry();
-
- if(hwm != null)
- {
- while (next != null && hwm.compareTo(next)>0)
- {
- next = next.getNextValidEntry();
- }
- }
- }
-
-
- public AMQQueue<QueueConsumer> getQueue()
- {
- return _queue;
- }
-
-
- public SimpleQueueEntryImpl add(ServerMessage message)
- {
- SimpleQueueEntryImpl node = createQueueEntry(message);
- for (;;)
- {
- SimpleQueueEntryImpl tail = _tail;
- SimpleQueueEntryImpl next = tail.getNextNode();
- if (tail == _tail)
- {
- if (next == null)
- {
- node.setEntryId(tail.getEntryId()+1);
- if (_nextUpdater.compareAndSet(tail, null, node))
- {
- _tailUpdater.compareAndSet(this, tail, node);
-
- return node;
- }
- }
- else
- {
- _tailUpdater.compareAndSet(this,tail, next);
- }
- }
- }
- }
-
- protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message)
- {
- return new SimpleQueueEntryImpl(this, message);
- }
-
- public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
- {
- return node.getNextValidEntry();
- }
-
- public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
- {
- private SimpleQueueEntryImpl _lastNode;
-
- QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
- {
- _lastNode = startNode;
- }
-
- public boolean atTail()
- {
- return _lastNode.getNextValidEntry() == null;
- }
-
- public SimpleQueueEntryImpl getNode()
- {
- return _lastNode;
- }
-
- public boolean advance()
- {
- SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry();
-
- if(nextValidNode != null)
- {
- _lastNode = nextValidNode;
- }
-
- return nextValidNode != null;
- }
- }
-
- public QueueEntryIteratorImpl iterator()
- {
- return new QueueEntryIteratorImpl(_head);
- }
-
-
- public SimpleQueueEntryImpl getHead()
- {
- return _head;
- }
-
- public void entryDeleted(SimpleQueueEntryImpl queueEntry)
- {
- SimpleQueueEntryImpl next = _head.getNextNode();
- SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
- // the head of the queue has not been deleted, hence the deletion must have been mid queue.
- if (next == newNext)
- {
- SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
- while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
- {
- _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
- unscavengedHWM = _unscavengedHWM.get();
- }
- if (_scavenges.incrementAndGet() > _scavengeCount)
- {
- _scavenges.set(0L);
- scavenge();
- }
- }
- else
- {
- SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
- if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
- {
- _unscavengedHWM.compareAndSet(unscavengedHWM, null);
- }
- }
- }
-
- public int getPriorities()
- {
- return 0;
- }
-
- static class Factory implements QueueEntryListFactory
- {
-
- public SimpleQueueEntryList createQueueEntryList(AMQQueue queue)
- {
- return new SimpleQueueEntryList(queue);
- }
- }
-
-
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index cad1aa6d4f..2cae18f3ec 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-public class SortedQueue extends OutOfOrderQueue
+public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
//Lock object to synchronize enqueue. Used instead of the object
//monitor to prevent lock order issues with consumer sendLocks
@@ -41,17 +41,33 @@ public class SortedQueue extends OutOfOrderQueue
final boolean durable, final String owner, final boolean autoDelete,
final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
{
+ this(id, name, durable, owner, autoDelete, exclusive,
+ virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+ }
+
+
+ protected SortedQueue(UUID id, final String name,
+ final boolean durable, final String owner, final boolean autoDelete,
+ final boolean exclusive, final VirtualHost virtualHost,
+ Map<String, Object> arguments,
+ String sortedPropertyName,
+ QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+ {
super(id, name, durable, owner, autoDelete, exclusive,
- virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+ virtualHost, factory, arguments);
this._sortedPropertyName = sortedPropertyName;
}
+
public String getSortedPropertyName()
{
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+ @Override
+ public void enqueue(final ServerMessage message,
+ final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+ throws AMQException
{
synchronized (_sortedQueueLock)
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
index 1052adbe67..30d58138fb 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
@@ -24,37 +24,37 @@ import org.apache.qpid.server.message.ServerMessage;
/**
* An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
*/
-public class SortedQueueEntryImpl extends QueueEntryImpl
+public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
public static enum Colour
{
RED, BLACK
};
- private volatile SortedQueueEntryImpl _next;
- private SortedQueueEntryImpl _prev;
+ private volatile SortedQueueEntry _next;
+ private SortedQueueEntry _prev;
private String _key;
private Colour _colour = Colour.BLACK;
- private SortedQueueEntryImpl _parent;
- private SortedQueueEntryImpl _left;
- private SortedQueueEntryImpl _right;
+ private SortedQueueEntry _parent;
+ private SortedQueueEntry _left;
+ private SortedQueueEntry _right;
- public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList)
+ public SortedQueueEntry(final SortedQueueEntryList queueEntryList)
{
super(queueEntryList);
}
- public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList,
- final ServerMessage message, final long entryId)
+ public SortedQueueEntry(final SortedQueueEntryList queueEntryList,
+ final ServerMessage message, final long entryId)
{
super(queueEntryList, message, entryId);
}
@Override
- public int compareTo(final QueueEntry o)
+ public int compareTo(final SortedQueueEntry o)
{
- final String otherKey = ((SortedQueueEntryImpl) o)._key;
+ final String otherKey = o._key;
final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
return compare == 0 ? super.compareTo(o) : compare;
}
@@ -69,33 +69,33 @@ public class SortedQueueEntryImpl extends QueueEntryImpl
return _key;
}
- public SortedQueueEntryImpl getLeft()
+ public SortedQueueEntry getLeft()
{
return _left;
}
- public SortedQueueEntryImpl getNextNode()
+ public SortedQueueEntry getNextNode()
{
return _next;
}
@Override
- public SortedQueueEntryImpl getNextValidEntry()
+ public SortedQueueEntry getNextValidEntry()
{
return getNextNode();
}
- public SortedQueueEntryImpl getParent()
+ public SortedQueueEntry getParent()
{
return _parent;
}
- public SortedQueueEntryImpl getPrev()
+ public SortedQueueEntry getPrev()
{
return _prev;
}
- public SortedQueueEntryImpl getRight()
+ public SortedQueueEntry getRight()
{
return _right;
}
@@ -110,27 +110,27 @@ public class SortedQueueEntryImpl extends QueueEntryImpl
_key = key;
}
- public void setLeft(final SortedQueueEntryImpl left)
+ public void setLeft(final SortedQueueEntry left)
{
_left = left;
}
- public void setNext(final SortedQueueEntryImpl next)
+ public void setNext(final SortedQueueEntry next)
{
_next = next;
}
- public void setParent(final SortedQueueEntryImpl parent)
+ public void setParent(final SortedQueueEntry parent)
{
_parent = parent;
}
- public void setPrev(final SortedQueueEntryImpl prev)
+ public void setPrev(final SortedQueueEntry prev)
{
_prev = prev;
}
- public void setRight(final SortedQueueEntryImpl right)
+ public void setRight(final SortedQueueEntry right)
{
_right = right;
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 336ee566eb..05b874cd91 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
/**
* A sorted implementation of QueueEntryList.
@@ -30,28 +30,28 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
* ISBN-13: 978-0262033848
* see http://en.wikipedia.org/wiki/Red-black_tree
*/
-public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
+public class SortedQueueEntryList implements SimpleQueueEntryList<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
- private final SortedQueueEntryImpl _head;
- private SortedQueueEntryImpl _root;
+ private final SortedQueueEntry _head;
+ private SortedQueueEntry _root;
private long _entryId = Long.MIN_VALUE;
private final Object _lock = new Object();
- private final AMQQueue<QueueConsumer> _queue;
+ private final SortedQueue _queue;
private final String _propertyName;
- public SortedQueueEntryList(final AMQQueue<QueueConsumer> queue, final String propertyName)
+ public SortedQueueEntryList(final SortedQueue queue, final String propertyName)
{
_queue = queue;
- _head = new SortedQueueEntryImpl(this);
+ _head = new SortedQueueEntry(this);
_propertyName = propertyName;
}
- public AMQQueue<QueueConsumer> getQueue()
+ public SortedQueue getQueue()
{
return _queue;
}
- public SortedQueueEntryImpl add(final ServerMessage message)
+ public SortedQueueEntry add(final ServerMessage message)
{
synchronized(_lock)
{
@@ -62,7 +62,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
key = val.toString();
}
- final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId);
+ final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId);
entry.setKey(key);
insert(entry);
@@ -75,9 +75,9 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
* Red Black Tree insert implementation.
* @param entry the entry to insert.
*/
- private void insert(final SortedQueueEntryImpl entry)
+ private void insert(final SortedQueueEntry entry)
{
- SortedQueueEntryImpl node = _root;
+ SortedQueueEntry node;
if((node = _root) == null)
{
_root = entry;
@@ -87,7 +87,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
else
{
- SortedQueueEntryImpl parent = null;
+ SortedQueueEntry parent = null;
while(node != null)
{
parent = node;
@@ -105,7 +105,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
if(entry.compareTo(parent) < 0)
{
parent.setLeft(entry);
- final SortedQueueEntryImpl prev = parent.getPrev();
+ final SortedQueueEntry prev = parent.getPrev();
entry.setNext(parent);
prev.setNext(entry);
entry.setPrev(prev);
@@ -115,7 +115,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
{
parent.setRight(entry);
- final SortedQueueEntryImpl next = parent.getNextValidEntry();
+ final SortedQueueEntry next = parent.getNextValidEntry();
entry.setNext(next);
parent.setNext(entry);
@@ -130,15 +130,15 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
insertFixup(entry);
}
- private void insertFixup(SortedQueueEntryImpl entry)
+ private void insertFixup(SortedQueueEntry entry)
{
while(isParentColour(entry, Colour.RED))
{
- final SortedQueueEntryImpl grandparent = nodeGrandparent(entry);
+ final SortedQueueEntry grandparent = nodeGrandparent(entry);
if(nodeParent(entry) == leftChild(grandparent))
{
- final SortedQueueEntryImpl y = rightChild(grandparent);
+ final SortedQueueEntry y = rightChild(grandparent);
if(isNodeColour(y, Colour.RED))
{
setColour(nodeParent(entry), Colour.BLACK);
@@ -160,7 +160,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
else
{
- final SortedQueueEntryImpl y = leftChild(grandparent);
+ final SortedQueueEntry y = leftChild(grandparent);
if(isNodeColour(y, Colour.RED))
{
setColour(nodeParent(entry), Colour.BLACK);
@@ -184,11 +184,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
_root.setColour(Colour.BLACK);
}
- private void leftRotate(final SortedQueueEntryImpl entry)
+ private void leftRotate(final SortedQueueEntry entry)
{
if(entry != null)
{
- final SortedQueueEntryImpl rightChild = rightChild(entry);
+ final SortedQueueEntry rightChild = rightChild(entry);
entry.setRight(rightChild.getLeft());
if(entry.getRight() != null)
{
@@ -212,11 +212,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- private void rightRotate(final SortedQueueEntryImpl entry)
+ private void rightRotate(final SortedQueueEntry entry)
{
if(entry != null)
{
- final SortedQueueEntryImpl leftChild = leftChild(entry);
+ final SortedQueueEntry leftChild = leftChild(entry);
entry.setLeft(leftChild.getRight());
if(entry.getLeft() != null)
{
@@ -240,7 +240,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- private void setColour(final SortedQueueEntryImpl node, final Colour colour)
+ private void setColour(final SortedQueueEntry node, final Colour colour)
{
if(node != null)
{
@@ -248,45 +248,45 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node)
+ private SortedQueueEntry leftChild(final SortedQueueEntry node)
{
return node == null ? null : node.getLeft();
}
- private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node)
+ private SortedQueueEntry rightChild(final SortedQueueEntry node)
{
return node == null ? null : node.getRight();
}
- private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node)
+ private SortedQueueEntry nodeParent(final SortedQueueEntry node)
{
return node == null ? null : node.getParent();
}
- private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node)
+ private SortedQueueEntry nodeGrandparent(final SortedQueueEntry node)
{
return nodeParent(nodeParent(node));
}
- private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ private boolean isParentColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour)
{
return node != null && isNodeColour(node.getParent(), colour);
}
- protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+ protected boolean isNodeColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour)
{
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
+ public SortedQueueEntry next(final SortedQueueEntry node)
{
synchronized(_lock)
{
if(node.isDeleted() && _head != node)
{
- SortedQueueEntryImpl current = _head;
- SortedQueueEntryImpl next;
+ SortedQueueEntry current = _head;
+ SortedQueueEntry next;
while(current != null)
{
next = current.getNextValidEntry();
@@ -308,22 +308,22 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- public QueueEntryIterator<SortedQueueEntryImpl> iterator()
+ public QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- public SortedQueueEntryImpl getHead()
+ public SortedQueueEntry getHead()
{
return _head;
}
- protected SortedQueueEntryImpl getRoot()
+ protected SortedQueueEntry getRoot()
{
return _root;
}
- public void entryDeleted(final SortedQueueEntryImpl entry)
+ public void entryDeleted(final SortedQueueEntry entry)
{
synchronized(_lock)
{
@@ -336,20 +336,20 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
// Then deal with the easy doubly linked list deletion (need to do
// this after the swap as the swap uses next
- final SortedQueueEntryImpl prev = entry.getPrev();
+ final SortedQueueEntry prev = entry.getPrev();
if(prev != null)
{
prev.setNext(entry.getNextValidEntry());
}
- final SortedQueueEntryImpl next = entry.getNextValidEntry();
+ final SortedQueueEntry next = entry.getNextValidEntry();
if(next != null)
{
next.setPrev(prev);
}
// now deal with splicing
- final SortedQueueEntryImpl chosenChild;
+ final SortedQueueEntry chosenChild;
if(leftChild(entry) != null)
{
@@ -428,14 +428,14 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
- * @param entry
+ * @param entry the entry to be swapped with its successor
*/
- private void swapWithSuccessor(final SortedQueueEntryImpl entry)
+ private void swapWithSuccessor(final SortedQueueEntry entry)
{
- final SortedQueueEntryImpl next = entry.getNextValidEntry();
- final SortedQueueEntryImpl nextParent = next.getParent();
- final SortedQueueEntryImpl nextLeft = next.getLeft();
- final SortedQueueEntryImpl nextRight = next.getRight();
+ final SortedQueueEntry next = entry.getNextValidEntry();
+ final SortedQueueEntry nextParent = next.getParent();
+ final SortedQueueEntry nextLeft = next.getLeft();
+ final SortedQueueEntry nextRight = next.getRight();
final Colour nextColour = next.getColour();
// Special case - the successor is the right child of the node
@@ -530,7 +530,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- private void deleteFixup(SortedQueueEntryImpl entry)
+ private void deleteFixup(SortedQueueEntry entry)
{
int i = 0;
while(entry != null && entry != _root
@@ -545,7 +545,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
if(entry == leftChild(nodeParent(entry)))
{
- SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry));
+ SortedQueueEntry rightSibling = rightChild(nodeParent(entry));
if(isNodeColour(rightSibling, Colour.RED))
{
setColour(rightSibling, Colour.BLACK);
@@ -578,7 +578,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
else
{
- SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry));
+ SortedQueueEntry leftSibling = leftChild(nodeParent(entry));
if(isNodeColour(leftSibling, Colour.RED))
{
setColour(leftSibling, Colour.BLACK);
@@ -613,16 +613,16 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
setColour(entry, Colour.BLACK);
}
- private Colour getColour(final SortedQueueEntryImpl x)
+ private Colour getColour(final SortedQueueEntry x)
{
return x == null ? null : x.getColour();
}
- public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl>
+ public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>>
{
- private SortedQueueEntryImpl _lastNode;
+ private SortedQueueEntry _lastNode;
- public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode)
+ public QueueEntryIteratorImpl(final SortedQueueEntry startNode)
{
_lastNode = startNode;
}
@@ -632,7 +632,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
return next(_lastNode) == null;
}
- public SortedQueueEntryImpl getNode()
+ public SortedQueueEntry getNode()
{
return _lastNode;
}
@@ -641,7 +641,7 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
{
if(!atTail())
{
- SortedQueueEntryImpl nextNode = next(_lastNode);
+ SortedQueueEntry nextNode = next(_lastNode);
while(nextNode.isDeleted() && next(nextNode) != null)
{
nextNode = next(nextNode);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
index 87c79178f0..69ffcd67a7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
@@ -19,7 +19,7 @@
*/
package org.apache.qpid.server.queue;
-public class SortedQueueEntryListFactory implements QueueEntryListFactory
+public class SortedQueueEntryListFactory implements QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>
{
private final String _propertyName;
@@ -30,9 +30,8 @@ public class SortedQueueEntryListFactory implements QueueEntryListFactory
}
@Override
- public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue)
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
{
return new SortedQueueEntryList(queue, _propertyName);
}
-
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
new file mode 100644
index 0000000000..e2ccdc45cf
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
+import java.util.UUID;
+
+public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
+{
+ public StandardQueue(final UUID id,
+ final String name,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ final boolean exclusive,
+ final VirtualHost virtualHost,
+ final Map<String, Object> arguments)
+ {
+ super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments);
+ }
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
new file mode 100644
index 0000000000..368015e9c0
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class StandardQueueEntry extends OrderedQueueEntry<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+{
+ protected StandardQueueEntry(final StandardQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public StandardQueueEntry(final StandardQueueEntryList queueEntryList,
+ final ServerMessage message,
+ final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ public StandardQueueEntry(final StandardQueueEntryList queueEntryList, final ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
new file mode 100644
index 0000000000..11ad04e61c
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+
+public class StandardQueueEntryList extends OrderedQueueEntryList<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+{
+
+ private static final HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList> HEAD_CREATOR = new HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList>()
+ {
+ @Override
+ public StandardQueueEntry createHead(final StandardQueueEntryList list)
+ {
+ return new StandardQueueEntry(list);
+ }
+ };
+
+ public StandardQueueEntryList(final StandardQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+
+ protected StandardQueueEntry createQueueEntry(ServerMessage<?> message)
+ {
+ return new StandardQueueEntry(this, message);
+ }
+
+ static class Factory implements QueueEntryListFactory<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+ {
+
+ public StandardQueueEntryList createQueueEntryList(StandardQueue queue)
+ {
+ return new StandardQueueEntryList(queue);
+ }
+ }
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index de7369f5ed..eff9bdf433 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -47,7 +47,7 @@ public class DurableConfigurationStoreHelper
Queue.EXCLUSIVE,
Queue.ALTERNATE_EXCHANGE));
- public static void updateQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) throws AMQStoreException
+ public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) throws AMQStoreException
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
attributesMap.put(Queue.NAME, queue.getName());
@@ -72,7 +72,7 @@ public class DurableConfigurationStoreHelper
store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index f60f173de9..4dd9831454 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -27,7 +27,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase
// Check that atest was a priority queue with 5 priorities
AMQQueue atest = vhost.getQueue("atest");
- assertTrue(atest instanceof AMQPriorityQueue);
- assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
+ assertTrue(atest instanceof PriorityQueue);
+ assertEquals(5, ((PriorityQueue) atest).getPriorities());
// Check that ptest was a priority queue with 10 priorities
AMQQueue ptest = vhost.getQueue("ptest");
- assertTrue(ptest instanceof AMQPriorityQueue);
- assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
+ assertTrue(ptest instanceof PriorityQueue);
+ assertEquals(10, ((PriorityQueue) ptest).getPriorities());
// Check that ntest wasn't a priority queue
AMQQueue ntest = vhost.getQueue("ntest");
- assertFalse(ntest instanceof AMQPriorityQueue);
+ assertFalse(ntest instanceof PriorityQueue);
}
public void testQueueAlerts() throws Exception
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index ea9d0ac693..8cab2e9058 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -74,7 +74,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testNoRoute() throws AMQException
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -86,7 +86,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -109,7 +109,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -140,7 +140,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -191,7 +191,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -217,7 +217,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchAfterHash() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -256,7 +256,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
@@ -278,7 +278,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws AMQException
{
- AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index bcf54c97a4..febce9ea2e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -231,7 +231,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
false,
attributes);
- assertEquals("Queue not a priority queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
verifyRegisteredQueueCount(1);
}
@@ -246,7 +246,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
false,
false,
null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
//verify that no alternate exchange or DLQ were produced
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
index d67c70c831..c2291f5eed 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
@@ -28,6 +28,9 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collections;
+import java.util.UUID;
+
public class ConflationQueueListTest extends TestCase
{
private static final String CONFLATION_KEY = "CONFLATION_KEY";
@@ -37,13 +40,15 @@ public class ConflationQueueListTest extends TestCase
private static final String TEST_KEY_VALUE2 = "testKeyValue2";
private ConflationQueueList _list;
- private AMQQueue _queue = createTestQueue();
+ private ConflationQueue _queue;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _list = new ConflationQueueList(_queue, CONFLATION_KEY);
+ _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap(),CONFLATION_KEY);
+ _list = _queue.getEntries();
}
public void testListHasNoEntries()
@@ -175,7 +180,8 @@ public class ConflationQueueListTest extends TestCase
private int countEntries(ConflationQueueList list)
{
- QueueEntryIterator<SimpleQueueEntryImpl> iterator = list.iterator();
+ QueueEntryIterator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList,QueueConsumer<?,ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>> iterator =
+ list.iterator();
int count = 0;
while(iterator.advance())
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 66c12717db..d1bc5effc0 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -45,7 +46,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
-public class MockAMQQueue implements AMQQueue<QueueConsumer>
+public class MockAMQQueue implements AMQQueue
{
private boolean _deleted = false;
private String _name;
@@ -178,6 +179,8 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return null;
}
+
+
public boolean isDurable()
{
return false;
@@ -210,32 +213,55 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
}
@Override
- public QueueConsumer addConsumer(final ConsumerTarget target,
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public void addQueueDeleteTask(final Action task)
+ {
+
+ }
+
+ @Override
+ public void enqueue(final ServerMessage message, final Action action) throws AMQException
+ {
+
+ }
+
+ @Override
+ public int compareTo(final Object o)
+ {
+ return 0;
+ }
+
+ @Override
+ public Consumer addConsumer(final ConsumerTarget target,
final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
+ final Class messageClass,
final String consumerName,
- final EnumSet<Consumer.Option> options) throws AMQException
+ final EnumSet options) throws AMQException
{
return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
- options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
- options.contains(Consumer.Option.TRANSIENT), target );
+ options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+ options.contains(Consumer.Option.TRANSIENT), target );
}
+
public String getName()
{
return _name;
}
- @Override
- public int send(final ServerMessage message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ public int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action postEnqueueAction)
{
return 0;
}
-
public Collection<QueueConsumer> getConsumers()
{
return Collections.emptyList();
@@ -312,38 +338,39 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return getMessageCount();
}
- public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
- {
- }
-
public void requeue(QueueEntry entry)
{
}
- public void dequeue(QueueEntry entry, Consumer sub)
+ public void dequeue(QueueEntry entry)
{
}
- public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException
+ public boolean resend(QueueEntry entry, QueueConsumer consumer) throws AMQException
{
return false;
}
- public void addQueueDeleteTask(Action<AMQQueue> task)
+ @Override
+ public void removeQueueDeleteTask(final Action task)
{
+
}
- public void removeQueueDeleteTask(final Action<AMQQueue> task)
+ @Override
+ public void decrementUnackedMsgCount(final QueueEntry queueEntry)
{
+
}
- public List<QueueEntry> getMessagesOnTheQueue()
+ @Override
+ public List getMessagesOnTheQueue()
{
return null;
}
- public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+ public List getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
return null;
}
@@ -363,7 +390,7 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return null;
}
- public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
+ public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
{
return null;
}
@@ -570,10 +597,6 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
return 0;
}
- public void decrementUnackedMsgCount(QueueEntry queueEntry)
- {
-
- }
public long getUnackedMessageCount()
{
@@ -617,4 +640,5 @@ public class MockAMQQueue implements AMQQueue<QueueConsumer>
{
return null;
}
+
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 89b366567d..386bb46044 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -32,7 +32,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-public class MockQueueEntry implements QueueEntry
+public class MockMessageInstance implements MessageInstance<MockMessageInstance,Consumer>
{
private ServerMessage _message;
@@ -42,13 +42,15 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean acquire(QueueConsumer sub)
+ @Override
+ public int getMaximumDeliveryCount()
{
- return false;
+ return 0;
}
@Override
- public int getMaximumDeliveryCount()
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action,
+ final ServerTransaction txn)
{
return 0;
}
@@ -58,27 +60,24 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public boolean isAcquiredBy(QueueConsumer consumer)
+ @Override
+ public boolean isAcquiredBy(final Consumer consumer)
{
return false;
}
- public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
- {
-
- }
-
public void delete()
{
}
- public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, final ServerTransaction txn)
+ public boolean expired() throws AMQException
{
- return 0;
+ return false;
}
- public boolean expired() throws AMQException
+ @Override
+ public boolean acquire(final Consumer sub)
{
return false;
}
@@ -88,7 +87,7 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public QueueConsumer getDeliveredConsumer()
+ public Consumer getDeliveredConsumer()
{
return null;
}
@@ -103,11 +102,6 @@ public class MockQueueEntry implements QueueEntry
return _message;
}
- public AMQQueue<QueueConsumer> getQueue()
- {
- return null;
- }
-
public long getSize()
{
return 0;
@@ -118,32 +112,19 @@ public class MockQueueEntry implements QueueEntry
return false;
}
-
- public boolean isQueueDeleted()
+ public void reject()
{
-
- return false;
}
-
- public boolean isRejectedBy(QueueConsumer consumer)
+ @Override
+ public boolean isRejectedBy(final Consumer consumer)
{
-
return false;
}
- public void reject()
- {
-
-
- }
-
-
public void release()
{
-
-
}
@Override
@@ -153,26 +134,18 @@ public class MockQueueEntry implements QueueEntry
}
- public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
- {
-
- return false;
- }
-
public void setRedelivered()
{
-
-
}
public AMQMessageHeader getMessageHeader()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isPersistent()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isRedelivered()
@@ -180,12 +153,6 @@ public class MockQueueEntry implements QueueEntry
return false;
}
- public int compareTo(QueueEntry o)
- {
-
- return 0;
- }
-
public void setMessage(ServerMessage msg)
{
_message = msg;
@@ -195,31 +162,32 @@ public class MockQueueEntry implements QueueEntry
{
return false;
}
-
- public QueueEntry getNextNode()
+ @Override
+ public int getDeliveryCount()
{
- return null;
+ return 0;
}
- public QueueEntry getNextValidEntry()
+ @Override
+ public void incrementDeliveryCount()
{
- return null;
}
@Override
- public int getDeliveryCount()
+ public void decrementDeliveryCount()
{
- return 0;
}
@Override
- public void incrementDeliveryCount()
+ public void addStateChangeListener(final StateChangeListener<? super MockMessageInstance, State> listener)
{
+
}
@Override
- public void decrementDeliveryCount()
+ public boolean removeStateChangeListener(final StateChangeListener<? super MockMessageInstance, State> listener)
{
+ return false;
}
@Override
@@ -237,6 +205,6 @@ public class MockQueueEntry implements QueueEntry
@Override
public TransactionLogResource getOwningResource()
{
- return getQueue();
+ return null;
}
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index e8c0470915..3db5d0fb62 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -26,12 +26,16 @@ import static org.mockito.Mockito.when;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import java.util.Collections;
+import java.util.UUID;
+
public class PriorityQueueListTest extends QpidTestCase
{
private static final byte[] PRIORITIES = {4, 5, 5, 4};
- PriorityQueueList _list = new PriorityQueueList(null, 10);
+ PriorityQueueList _list;
private QueueEntry _priority4message1;
private QueueEntry _priority4message2;
@@ -42,6 +46,17 @@ public class PriorityQueueListTest extends QpidTestCase
{
QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
+ PriorityQueue queue = new PriorityQueue(UUID.randomUUID(),
+ getName(),
+ false,
+ null,
+ false,
+ false,
+ mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap(),
+ 10);
+ _list = queue.getEntries();
+
for (int i = 0; i < PRIORITIES.length; i++)
{
ServerMessage<?> message = mock(ServerMessage.class);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index ced00dc578..56cd29b0bd 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -24,6 +24,8 @@ import java.util.Collections;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -34,9 +36,10 @@ import java.util.EnumSet;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.consumer.Consumer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class AMQPriorityQueueTest extends SimpleAMQQueueTest
+public class PriorityQueueTest extends SimpleAMQQueueTestBase
{
@Override
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 95139d8740..b67b2dda32 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -22,12 +22,18 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -57,6 +63,15 @@ public abstract class QueueEntryImplTestBase extends TestCase
_queueEntry3 = getQueueEntryImpl(3);
}
+
+ protected void mockLogging()
+ {
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.setDefault(logActor);
+ }
+
+
public void testAcquire()
{
assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
@@ -185,7 +200,9 @@ public abstract class QueueEntryImplTestBase extends TestCase
{
int numberOfEntries = 5;
QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
- SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
+ mock(VirtualHost.class), Collections.<String,Object>emptyMap());
+ OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
for(int i = 0; i < numberOfEntries ; i++)
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index a1bc9e50c7..3af268c189 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -32,20 +33,21 @@ import static org.mockito.Mockito.when;
/**
* Abstract test class for QueueEntryList implementations.
*/
-public abstract class QueueEntryListTestBase extends TestCase
+public abstract class QueueEntryListTestBase<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> extends TestCase
{
- protected static final AMQQueue _testQueue = new MockAMQQueue("test");
- public abstract QueueEntryList<QueueEntry> getTestList();
- public abstract QueueEntryList<QueueEntry> getTestList(boolean newList);
+ public abstract L getTestList();
+ public abstract L getTestList(boolean newList);
public abstract long getExpectedFirstMsgId();
public abstract int getExpectedListLength();
public abstract ServerMessage getTestMessageToAdd() throws AMQException;
public void testGetQueue()
{
- assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue);
+ assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), getTestQueue());
}
+ protected abstract Q getTestQueue();
+
/**
* Test to add a message with properties specific to the queue type.
* @see QueueEntryListTestBase#getTestList()
@@ -54,10 +56,10 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testAddSpecificMessage() throws AMQException
{
- final QueueEntryList<QueueEntry> list = getTestList();
+ final L list = getTestList();
list.add(getTestMessageToAdd());
- final QueueEntryIterator<?> iter = list.iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
int count = 0;
while(iter.advance())
{
@@ -75,11 +77,11 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testAddGenericMessage() throws AMQException
{
- final QueueEntryList<QueueEntry> list = getTestList();
+ final L list = getTestList();
final ServerMessage message = createServerMessage(666l);
list.add(message);
- final QueueEntryIterator<?> iter = list.iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
int count = 0;
while(iter.advance())
{
@@ -109,8 +111,8 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testListNext()
{
- final QueueEntryList<QueueEntry> entryList = getTestList();
- QueueEntry entry = entryList.getHead();
+ final L entryList = getTestList();
+ E entry = entryList.getHead();
int count = 0;
while(entryList.next(entry) != null)
{
@@ -127,7 +129,7 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testIterator()
{
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<E,Q,L,C> iter = getTestList().iterator();
int count = 0;
while(iter.advance())
{
@@ -145,10 +147,10 @@ public abstract class QueueEntryListTestBase extends TestCase
public void testDequeuedMessagedNotPresentInIterator() throws Exception
{
final int numberOfMessages = getExpectedListLength();
- final QueueEntryList<QueueEntry> entryList = getTestList();
+ final L entryList = getTestList();
// dequeue all even messages
- final QueueEntryIterator<?> it1 = entryList.iterator();
+ final QueueEntryIterator<E,Q,L,C> it1 = entryList.iterator();
int counter = 0;
while (it1.advance())
{
@@ -161,7 +163,7 @@ public abstract class QueueEntryListTestBase extends TestCase
}
// iterate and check that dequeued messages are not returned by iterator
- final QueueEntryIterator<?> it2 = entryList.iterator();
+ final QueueEntryIterator<E,Q,L,C> it2 = entryList.iterator();
int counter2 = 0;
while(it2.advance())
{
@@ -180,7 +182,7 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testGetHead()
{
- final QueueEntry head = getTestList().getHead();
+ final E head = getTestList().getHead();
assertNull("Head entry should not contain an actual message", head.getMessage());
assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
.getMessage().getMessageNumber());
@@ -192,16 +194,16 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testEntryDeleted()
{
- final QueueEntry head = getTestList().getHead();
+ final E head = getTestList().getHead();
- final QueueEntry first = getTestList().next(head);
+ final E first = getTestList().next(head);
first.delete();
- final QueueEntry second = getTestList().next(head);
+ final E second = getTestList().next(head);
assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second
.getMessage().getMessageNumber());
- final QueueEntry third = getTestList().next(first);
+ final E third = getTestList().next(first);
assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second
.getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
}
@@ -215,11 +217,11 @@ public abstract class QueueEntryListTestBase extends TestCase
*/
public void testIteratorIgnoresDeletedFinalNode() throws Exception
{
- QueueEntryList<QueueEntry> list = getTestList(true);
+ L list = getTestList(true);
int i = 0;
- QueueEntry queueEntry1 = list.add(createServerMessage(i++));
- QueueEntry queueEntry2 = list.add(createServerMessage(i++));
+ E queueEntry1 = list.add(createServerMessage(i++));
+ E queueEntry2 = list.add(createServerMessage(i++));
assertSame(queueEntry2, list.next(queueEntry1));
assertNull(list.next(queueEntry2));
@@ -228,7 +230,7 @@ public abstract class QueueEntryListTestBase extends TestCase
queueEntry2.delete();
assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
- QueueEntryIterator<QueueEntry> iter = list.iterator();
+ QueueEntryIterator<E,Q,L,C> iter = list.iterator();
//verify the iterator isn't 'atTail', can advance, and returns the 1st QueueEntry
assertFalse("Iterator should not have been 'atTail'", iter.atTail());
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
index 674af36b77..87182dc8dc 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import junit.framework.Assert;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
/**
* Test extension of SortedQueueEntryList that provides data structure validation tests.
@@ -30,22 +30,28 @@ import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
*/
public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
{
- public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName)
+ public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName)
{
super(queue, propertyName);
}
+ @Override
+ public SortedQueue getQueue()
+ {
+ return super.getQueue();
+ }
+
@Override /** Overridden to automatically check queue properties before and after. */
- public SortedQueueEntryImpl add(final ServerMessage message)
+ public SortedQueueEntry add(final ServerMessage message)
{
assertQueueProperties(); //before add
- final SortedQueueEntryImpl result = super.add(message);
+ final SortedQueueEntry result = super.add(message);
assertQueueProperties(); //after add
return result;
}
@Override /** Overridden to automatically check queue properties before and after. */
- public void entryDeleted(SortedQueueEntryImpl entry)
+ public void entryDeleted(SortedQueueEntry entry)
{
assertQueueProperties(); //before delete
super.entryDeleted(entry);
@@ -73,7 +79,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
assertTreeIntegrity(getRoot());
}
- public void assertTreeIntegrity(final SortedQueueEntryImpl node)
+ public void assertTreeIntegrity(final SortedQueueEntry node)
{
if(node == null)
{
@@ -109,7 +115,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
assertLeavesSameBlackPath(getRoot());
}
- public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node)
+ public int assertLeavesSameBlackPath(final SortedQueueEntry node)
{
if(node == null)
{
@@ -133,7 +139,7 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
assertChildrenOfRedAreBlack(getRoot());
}
- public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node)
+ public void assertChildrenOfRedAreBlack(final SortedQueueEntry node)
{
if(node == null)
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
index 5abc97cee9..45001bda50 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
@@ -56,14 +55,13 @@ import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-public class SimpleAMQQueueTest extends QpidTestCase
+abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase
{
- private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTest.class);
+ private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class);
- private SimpleAMQQueue _queue;
+
+ private Q _queue;
private VirtualHost _virtualHost;
private String _qname = "qname";
private String _owner = "owner";
@@ -81,7 +79,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner,
false, false, false, _arguments);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
@@ -107,7 +105,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.stop();
try
{
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
false, _owner, false,
false, false, _arguments);
assertNull("Queue was created", _queue);
@@ -118,24 +116,14 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("name"));
}
- try
- {
- _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
- assertNull("Queue was created", _queue);
- }
- catch (IllegalArgumentException e)
- {
- assertTrue("Exception was not about missing vhost",
- e.getMessage().contains("Host"));
- }
-
- _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
+ _queue = (Q) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
"differentName", false,
_owner, false,
false, false, _arguments);
assertNotNull("Queue was not created", _queue);
}
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
@@ -150,11 +138,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertTrue("Queue was not bound to key",
_exchange.isBound(_routingKey,_queue));
assertEquals("Exchange binding count", 1,
- _queue.getBindings().size());
+ _queue.getBindings().size());
assertEquals("Wrong exchange bound", _routingKey,
_queue.getBindings().get(0).getBindingKey());
assertEquals("Wrong exchange bound", _exchange,
- _queue.getBindings().get(0).getExchange());
+ _queue.getBindings().get(0).getExchange());
_exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP);
assertFalse("Routing key was still bound",
@@ -495,22 +483,6 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertNotNull(ex);
}
- public void testAutoDeleteQueue() throws Exception
- {
- _queue.stop();
- _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
- _queue.setDeleteOnNoConsumers(true);
-
- ServerMessage message = createMessage(new Long(25));
- _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
-
- _queue.enqueue(message, null);
- _consumer.close();
- assertTrue("Queue was not deleted when consumer was removed",
- _queue.isDeleted());
- }
public void testResend() throws Exception
{
@@ -520,12 +492,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
- _queue.enqueue(message, new Action<MessageInstance<? extends Consumer>>()
+ _queue.enqueue(message, new Action<MessageInstance<?,? extends Consumer>>()
{
@Override
- public void performAction(final MessageInstance<? extends Consumer> object)
+ public void performAction(final MessageInstance<?,? extends Consumer> object)
{
- QueueEntry entry = (QueueEntry) object;
+ QueueEntryImpl entry = (QueueEntryImpl) object;
entry.setRedelivered();
try
{
@@ -612,7 +584,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
- List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
+ List<E> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
// Check that when 'from' is 0 it is ignored and the range continues from 1
@@ -681,22 +653,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
- TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testQueue", false,"testOwner",
- false, false, _virtualHost, factory, null)
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new consumers
- }
- };
+ SimpleAMQQueue testQueue = createNonAsyncDeliverQueue();
// retrieve the QueueEntryList the queue creates and insert the test
// messages, thus avoiding straight-through delivery attempts during
//enqueue() process.
- QueueEntryList list = factory.getQueueEntryList();
+ QueueEntryList list = testQueue.getEntries();
assertNotNull("QueueEntryList should have been created", list);
QueueEntry msg1 = list.add(createMessage(1L));
@@ -748,6 +710,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
}
+ private SimpleAMQQueue createNonAsyncDeliverQueue()
+ {
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ return new NonAsyncDeliverQueue(factory, getVirtualHost());
+ }
+
/**
* Tests that dequeued message is not present in the list returned form
* {@link SimpleAMQQueue#getMessagesOnTheQueue()}
@@ -764,7 +732,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertEquals(messageNumber - 1, entries.size());
@@ -801,9 +769,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue with filter accepting all available messages
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter()
+ List<E> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
- public boolean accept(QueueEntry entry)
+ public boolean accept(E entry)
{
return true;
}
@@ -855,12 +823,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queue.deleteMessageFromTop();
//get queue entries
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(),
- messageNumber - 2, entries.size());
+ messageNumber - 2, entries.size());
assertEquals("Expected first entry with id 2", 2l,
(entries.get(0).getMessage()).getMessageNumber());
}
@@ -891,241 +859,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
// get queue entries
- List<QueueEntry> entries = _queue.getMessagesOnTheQueue();
+ List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull(entries);
assertEquals(0, entries.size());
}
- /**
- * Tests whether dequeued entry is sent to subscriber in result of
- * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
- */
- public void testProcessQueueWithDequeuedEntry()
- {
- // total number of messages to send
- int messageNumber = 4;
- int dequeueMessageIndex = 1;
-
- // create queue with overridden method deliverAsync
- SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test",
- false, "testOwner", false, false, _virtualHost, null)
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing
- }
- };
-
- // put messages
- List<QueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
-
- // dequeue message
- dequeueMessage(testQueue, dequeueMessageIndex);
-
- // latch to wait for message receipt
- final CountDownLatch latch = new CountDownLatch(messageNumber -1);
-
- // create a consumer
- MockConsumer consumer = new MockConsumer()
- {
- /**
- * Send a message and decrement latch
- * @param entry
- * @param batch
- */
- public void send(MessageInstance entry, boolean batch) throws AMQException
- {
- super.send(entry, batch);
- latch.countDown();
- }
- };
-
- try
- {
- // subscribe
- testQueue.addConsumer(consumer,
- null,
- entries.get(0).getMessage().getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
-
- // process queue
- testQueue.processQueue(new QueueRunner(testQueue)
- {
- public void run()
- {
- // do nothing
- }
- });
- }
- catch (AMQException e)
- {
- fail("Failure to process queue:" + e.getMessage());
- }
- // wait up to 1 minute for message receipt
- try
- {
- latch.await(1, TimeUnit.MINUTES);
- }
- catch (InterruptedException e1)
- {
- Thread.currentThread().interrupt();
- }
- List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, consumer.getMessages());
- }
-
- /**
- * Tests that entry in dequeued state are not enqueued and not delivered to consumer
- */
- public void testEnqueueDequeuedEntry()
- {
- // create a queue where each even entry is considered a dequeued
- SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "test", false,
- "testOwner", false, false, _virtualHost, new QueueEntryListFactory()
- {
- public QueueEntryList createQueueEntryList(AMQQueue queue)
- {
- /**
- * Override SimpleQueueEntryList to create a dequeued
- * entries for messages with even id
- */
- return new SimpleQueueEntryList(queue)
- {
- /**
- * Entries with even message id are considered
- * dequeued!
- */
- protected SimpleQueueEntryImpl createQueueEntry(final ServerMessage message)
- {
- return new SimpleQueueEntryImpl(this, message)
- {
-
- public boolean isDeleted()
- {
- return (message.getMessageNumber() % 2 == 0);
- }
-
- public boolean isAvailable()
- {
- return !(message.getMessageNumber() % 2 == 0);
- }
-
- @Override
- public boolean acquire(QueueConsumer sub)
- {
- if(message.getMessageNumber() % 2 == 0)
- {
- return false;
- }
- else
- {
- return super.acquire(sub);
- }
- }
- };
- }
- };
- }
- }, null);
- // create a consumer
- MockConsumer consumer = new MockConsumer();
-
- // register consumer
- try
- {
- queue.addConsumer(consumer,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- }
- catch (AMQException e)
- {
- fail("Failure to register consumer:" + e.getMessage());
- }
-
- // put test messages into a queue
- putGivenNumberOfMessages(queue, 4);
-
- // assert received messages
- List<MessageInstance> messages = consumer.getMessages();
- assertEquals("Only 2 messages should be returned", 2, messages.size());
- assertEquals("ID of first message should be 1", 1l,
- (messages.get(0).getMessage()).getMessageNumber());
- assertEquals("ID of second message should be 3", 3l,
- (messages.get(1).getMessage()).getMessageNumber());
- }
-
- public void testActiveConsumerCount() throws Exception
- {
- final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
- "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
-
- //verify adding an active consumer increases the count
- final MockConsumer consumer1 = new MockConsumer();
- consumer1.setActive(true);
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.addConsumer(consumer1,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify adding an inactive consumer doesn't increase the count
- final MockConsumer consumer2 = new MockConsumer();
- consumer2.setActive(false);
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.addConsumer(consumer2,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(Consumer.Option.ACQUIRES,
- Consumer.Option.SEES_REQUEUES));
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify behaviour in face of expected state changes:
-
- //verify a consumer going suspended->active increases the count
- consumer2.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
-
- //verify a consumer going active->suspended decreases the count
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going suspended->closed doesn't change the count
- consumer2.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going active->active doesn't change the count
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- //verify a consumer going suspended->suspended doesn't change the count
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
- //verify a consumer going active->closed decreases the count
- consumer1.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
- }
public void testNotificationFiredOnEnqueue() throws Exception
{
@@ -1170,12 +910,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
* @param messageNumber
* number of messages to put into queue
*/
- private List<QueueEntry> enqueueGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ protected List<E> enqueueGivenNumberOfMessages(Q queue, int messageNumber)
{
putGivenNumberOfMessages(queue, messageNumber);
// make sure that all enqueued messages are on the queue
- List<QueueEntry> entries = queue.getMessagesOnTheQueue();
+ List<E> entries = queue.getMessagesOnTheQueue();
assertEquals(messageNumber, entries.size());
for (int i = 0; i < messageNumber; i++)
{
@@ -1196,16 +936,15 @@ public class SimpleAMQQueueTest extends QpidTestCase
* @param queue
* @param messageNumber
*/
- private void putGivenNumberOfMessages(AMQQueue queue, int messageNumber)
+ protected <T extends SimpleAMQQueue> void putGivenNumberOfMessages(T queue, int messageNumber)
{
for (int i = 0; i < messageNumber; i++)
{
// Create message
- Long messageId = new Long(i);
ServerMessage message = null;
try
{
- message = createMessage(messageId);
+ message = createMessage((long)i);
}
catch (AMQException e)
{
@@ -1239,7 +978,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
* @param dequeueMessageIndex
* entry index to dequeue.
*/
- private QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
+ protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
{
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
QueueEntry entry = entries.get(dequeueMessageIndex);
@@ -1259,7 +998,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
return entriesList;
}
- private void verifyReceivedMessages(List<MessageInstance> expected,
+ protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
@@ -1272,11 +1011,16 @@ public class SimpleAMQQueueTest extends QpidTestCase
}
}
- public SimpleAMQQueue getQueue()
+ public Q getQueue()
{
return _queue;
}
+ protected void setQueue(Q queue)
+ {
+ _queue = queue;
+ }
+
public MockConsumer getConsumer()
{
return _consumerTarget;
@@ -1310,7 +1054,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
return message;
}
- private static class EntryListAddingAction implements Action<MessageInstance<? extends Consumer>>
+ private static class EntryListAddingAction implements Action<MessageInstance<?,? extends Consumer>>
{
private final ArrayList<QueueEntry> _queueEntries;
@@ -1319,25 +1063,122 @@ public class SimpleAMQQueueTest extends QpidTestCase
_queueEntries = queueEntries;
}
- public void performAction(MessageInstance<? extends Consumer> entry)
+ public void performAction(MessageInstance<?,? extends Consumer> entry)
{
_queueEntries.add((QueueEntry) entry);
}
}
- class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+
+ public VirtualHost getVirtualHost()
{
- QueueEntryList _list;
+ return _virtualHost;
+ }
+
+ public String getQname()
+ {
+ return _qname;
+ }
+
+ public String getOwner()
+ {
+ return _owner;
+ }
+
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public DirectExchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public MockConsumer getConsumerTarget()
+ {
+ return _consumerTarget;
+ }
+
- public QueueEntryList createQueueEntryList(AMQQueue queue)
+ static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+
+ @Override
+ public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue)
+ {
+ return new NonAsyncDeliverList(queue);
+ }
+ }
+
+ private static class NonAsyncDeliverEntry extends OrderedQueueEntry<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
+ {
+ super(queueEntryList);
+ }
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
+ final ServerMessage message,
+ final long entryId)
+ {
+ super(queueEntryList, message, entryId);
+ }
+
+ public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+ }
+
+ private static class NonAsyncDeliverList extends OrderedQueueEntryList<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+
+ private static final HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> HEAD_CREATOR =
+ new HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>()
+ {
+
+ @Override
+ public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list)
+ {
+ return new NonAsyncDeliverEntry(list);
+ }
+ };
+
+ public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+ @Override
+ protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message)
+ {
+ return new NonAsyncDeliverEntry(this,message);
+ }
+ }
+
+
+ private static class NonAsyncDeliverQueue extends SimpleAMQQueue<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
+ {
+ public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
{
- _list = new SimpleQueueEntryList(queue);
- return _list;
+ super(UUIDGenerator.generateRandomUUID(),
+ "testQueue",
+ false,
+ "testOwner",
+ false,
+ false,
+ vhost,
+ factory,
+ null);
}
- public QueueEntryList getQueueEntryList()
+ @Override
+ public void deliverAsync(QueueConsumer sub)
{
- return _list;
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new consumers
}
}
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 11ff7ed192..36425761be 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -21,8 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -30,7 +36,20 @@ import static org.mockito.Mockito.when;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
{
- private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ private OrderedQueueEntryList queueEntryList;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ mockLogging();
+
+ StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+
+ queueEntryList = queue.getEntries();
+
+ super.setUp();
+ }
+
public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
{
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index 7add2d4d43..b6b3843ad2 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -21,20 +21,26 @@ package org.apache.qpid.server.queue;
import java.util.Collections;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
+import java.util.UUID;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryListTest extends QueueEntryListTestBase
+public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>>
{
private static SelfValidatingSortedQueueEntryList _sqel;
+
public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
" 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
"195", " 96", " 2", " 68", "101", "141", "159", "187", "149", " 45",
@@ -62,16 +68,30 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
private final static String keysSorted[] = keys.clone();
+ private SortedQueue _testQueue;
+
@Override
protected void setUp() throws Exception
{
+ mockLogging();
+
+ // Create test list
+ _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ {
+
+ @Override
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ {
+ return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+ }
+ });
+ _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries();
+
super.setUp();
// Create result array
Arrays.sort(keysSorted);
- // Create test list
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
// Build test list
long messageId = 0L;
@@ -83,14 +103,22 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
}
+ protected void mockLogging()
+ {
+ final LogActor logActor = mock(LogActor.class);
+ when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ CurrentActor.setDefault(logActor);
+ }
+
+
@Override
- public QueueEntryList getTestList()
+ public SortedQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
- public QueueEntryList getTestList(boolean newList)
+ public SortedQueueEntryList getTestList(boolean newList)
{
if(newList)
{
@@ -117,6 +145,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
return generateTestMessage(1, "test value");
}
+ @Override
+ protected SortedQueue getTestQueue()
+ {
+ return _testQueue;
+ }
+
private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
{
final ServerMessage message = mock(ServerMessage.class);
@@ -138,7 +172,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
super.testIterator();
// Test sorted order of list
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count = 0;
while(iter.advance())
{
@@ -147,12 +181,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
}
}
- private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+ private Object getSortedKeyValue(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
}
- private Long getMessageId(QueueEntryIterator<?> iter)
+ private Long getMessageId(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
{
return (iter.getNode()).getMessage().getMessageNumber();
}
@@ -169,7 +203,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -190,12 +224,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
- assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter)); }
+ assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+ }
}
public void testAscendingSortKeys() throws Exception
@@ -211,7 +246,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -234,7 +269,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel.add(msg);
}
- final QueueEntryIterator<?> iter = getTestList().iterator();
+ final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
int count=0;
while(iter.advance())
{
@@ -251,7 +286,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "B");
@@ -271,7 +306,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "A");
@@ -290,7 +325,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "C");
@@ -322,7 +357,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
- SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+ SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "D");
@@ -362,7 +397,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
validateEntry(entry, "D", 2);
}
- private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+ private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
{
assertEquals("Sorted queue entry value is not as expected",
expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index a84dd6c249..a406c1c26f 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryImplTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -20,21 +20,41 @@
package org.apache.qpid.server.queue;
import java.util.Collections;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SortedQueueEntryImplTest extends QueueEntryImplTestBase
+public class SortedQueueEntryTest extends QueueEntryImplTestBase
{
public final static String keys[] = { "CCC", "AAA", "BBB" };
- private SelfValidatingSortedQueueEntryList queueEntryList = new SelfValidatingSortedQueueEntryList(new MockAMQQueue("test"),"KEY");
+ private SelfValidatingSortedQueueEntryList _queueEntryList;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ mockLogging();
+ SortedQueue queue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+ {
+
+ @Override
+ public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ {
+ return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+ }
+ });
+ _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries();
+ super.setUp();
+ }
public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
{
@@ -48,7 +68,7 @@ public class SortedQueueEntryImplTest extends QueueEntryImplTestBase
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
- return queueEntryList.add(message);
+ return _queueEntryList.add(message);
}
public void testCompareTo()
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index ae282d5f37..c053957e2a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -23,16 +23,21 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SimpleQueueEntryListTest extends QueueEntryListTestBase
+public class StandardQueueEntryListTest extends QueueEntryListTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList, QueueConsumer<?,StandardQueueEntry, StandardQueue, StandardQueueEntryList>>
{
- private SimpleQueueEntryList _sqel;
+
+ private StandardQueue _testQueue;
+ private StandardQueueEntryList _sqel;
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
private String oldScavengeValue = null;
@@ -41,7 +46,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
- _sqel = new SimpleQueueEntryList(_testQueue);
+ _testQueue = new StandardQueue(UUID.randomUUID(),getName(),false,null,false,false,mock(VirtualHost.class),
+ Collections.<String,Object>emptyMap());
+
+ _sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
{
final ServerMessage message = mock(ServerMessage.class);
@@ -69,17 +77,21 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
}
@Override
- public QueueEntryList getTestList()
+ public StandardQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
- public QueueEntryList getTestList(boolean newList)
+ public StandardQueueEntryList getTestList(boolean newList)
{
if(newList)
{
- return new SimpleQueueEntryList(_testQueue);
+ StandardQueue queue =
+ new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
+ Collections.<String, Object>emptyMap());
+
+ return queue.getEntries();
}
else
{
@@ -107,9 +119,15 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
return msg;
}
+ @Override
+ protected StandardQueue getTestQueue()
+ {
+ return _testQueue;
+ }
+
public void testScavenge() throws Exception
{
- SimpleQueueEntryList sqel = new SimpleQueueEntryList(null);
+ OrderedQueueEntryList sqel = new StandardQueueEntryList(null);
ConcurrentHashMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
@@ -126,7 +144,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
entriesMap.put(i,bleh);
}
- SimpleQueueEntryImpl head = sqel.getHead();
+ OrderedQueueEntry head = sqel.getHead();
//We shall now delete some specific messages mid-queue that will lead to
//requiring a scavenge once the requested threshold of 9 deletes is passed
@@ -172,10 +190,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
return entry.isDeleted() && !wasDeleted;
}
- private void verifyDeletedButPresentBeforeScavenge(SimpleQueueEntryImpl head, long messageId)
+ private void verifyDeletedButPresentBeforeScavenge(OrderedQueueEntry head, long messageId)
{
//Use the head to get the initial entry in the queue
- SimpleQueueEntryImpl entry = head.getNextNode();
+ OrderedQueueEntry entry = head.getNextNode();
for(long i = 1; i < messageId ; i++)
{
@@ -186,10 +204,10 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
assertTrue("Entry should have been deleted", entry.isDeleted());
}
- private void verifyAllDeletedMessagedNotPresent(SimpleQueueEntryImpl head, Map<Integer,QueueEntry> remainingMessages)
+ private void verifyAllDeletedMessagedNotPresent(OrderedQueueEntry head, Map<Integer,QueueEntry> remainingMessages)
{
//Use the head to get the initial entry in the queue
- SimpleQueueEntryImpl entry = head.getNextNode();
+ OrderedQueueEntry entry = head.getNextNode();
assertNotNull("Initial entry should not have been null", entry);
@@ -211,8 +229,8 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
public void testGettingNextElement()
{
final int numberOfEntries = 5;
- final SimpleQueueEntryImpl[] entries = new SimpleQueueEntryImpl[numberOfEntries];
- final SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+ final OrderedQueueEntry[] entries = new OrderedQueueEntry[numberOfEntries];
+ final OrderedQueueEntryList queueEntryList = getTestList(true);
// create test entries
for(int i = 0; i < numberOfEntries; i++)
@@ -228,7 +246,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
// test getNext for not acquired entries
for(int i = 0; i < numberOfEntries; i++)
{
- final SimpleQueueEntryImpl next = entries[i].getNextValidEntry();
+ final OrderedQueueEntry next = entries[i].getNextValidEntry();
if(i < numberOfEntries - 1)
{
@@ -248,7 +266,7 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
entries[2].acquire();
entries[2].delete();
- SimpleQueueEntryImpl next = entries[2].getNextValidEntry();
+ OrderedQueueEntry next = entries[2].getNextValidEntry();
assertEquals("expected forth entry", entries[3], next);
next = next.getNextValidEntry();
assertEquals("expected fifth entry", entries[4], next);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
new file mode 100644
index 0000000000..081b1d30ad
--- /dev/null
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -0,0 +1,363 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class StandardQueueTest extends SimpleAMQQueueTestBase<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+{
+
+ public void testCreationFailsWithNoVhost()
+ {
+ try
+ {
+ setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, getOwner(), false,false, null, getArguments()));
+ assertNull("Queue was created", getQueue());
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue("Exception was not about missing vhost",
+ e.getMessage().contains("Host"));
+ }
+ }
+
+
+ public void testAutoDeleteQueue() throws Exception
+ {
+ getQueue().stop();
+ setQueue(new StandardQueue(UUIDGenerator.generateRandomUUID(), getQname(), false, null, true, false, getVirtualHost(), Collections.<String,Object>emptyMap()));
+ getQueue().setDeleteOnNoConsumers(true);
+
+ ServerMessage message = createMessage(25l);
+ QueueConsumer consumer =
+ getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ getQueue().enqueue(message, null);
+ consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
+ getQueue().isDeleted());
+ }
+
+ public void testActiveConsumerCount() throws Exception
+ {
+ final StandardQueue queue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
+ "testOwner", false, false, getVirtualHost(), null);
+
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of expected state changes:
+
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ }
+
+
+ /**
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
+ */
+ public void testEnqueueDequeuedEntry()
+ {
+ // create a queue where each even entry is considered a dequeued
+ SimpleAMQQueue queue = new DequeuedQueue(UUIDGenerator.generateRandomUUID(), "test", false,
+ "testOwner", false, false, getVirtualHost(), null);
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
+
+ // register consumer
+ try
+ {
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to register consumer:" + e.getMessage());
+ }
+
+ // put test messages into a queue
+ putGivenNumberOfMessages(queue, 4);
+
+ // assert received messages
+ List<MessageInstance> messages = consumer.getMessages();
+ assertEquals("Only 2 messages should be returned", 2, messages.size());
+ assertEquals("ID of first message should be 1", 1l,
+ (messages.get(0).getMessage()).getMessageNumber());
+ assertEquals("ID of second message should be 3", 3l,
+ (messages.get(1).getMessage()).getMessageNumber());
+ }
+
+ /**
+ * Tests whether dequeued entry is sent to subscriber in result of
+ * invocation of {@link SimpleAMQQueue#processQueue(QueueRunner)}
+ */
+ public void testProcessQueueWithDequeuedEntry()
+ {
+ // total number of messages to send
+ int messageNumber = 4;
+ int dequeueMessageIndex = 1;
+
+ // create queue with overridden method deliverAsync
+ StandardQueue testQueue = new StandardQueue(UUIDGenerator.generateRandomUUID(), "test",
+ false, "testOwner", false, false, getVirtualHost(), null)
+ {
+ @Override
+ public void deliverAsync(QueueConsumer sub)
+ {
+ // do nothing
+ }
+ };
+
+ // put messages
+ List<StandardQueueEntry> entries = enqueueGivenNumberOfMessages(testQueue, messageNumber);
+
+ // dequeue message
+ dequeueMessage(testQueue, dequeueMessageIndex);
+
+ // latch to wait for message receipt
+ final CountDownLatch latch = new CountDownLatch(messageNumber -1);
+
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
+ {
+ /**
+ * Send a message and decrement latch
+ * @param entry
+ * @param batch
+ */
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+ super.send(entry, batch);
+ latch.countDown();
+ }
+ };
+
+ try
+ {
+ // subscribe
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
+
+ // process queue
+ testQueue.processQueue(new QueueRunner(testQueue)
+ {
+ public void run()
+ {
+ // do nothing
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ fail("Failure to process queue:" + e.getMessage());
+ }
+ // wait up to 1 minute for message receipt
+ try
+ {
+ latch.await(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e1)
+ {
+ Thread.currentThread().interrupt();
+ }
+ List<MessageInstance> expected = Arrays.asList((MessageInstance) entries.get(0), entries.get(2), entries.get(3));
+ verifyReceivedMessages(expected, consumer.getMessages());
+ }
+
+
+ private static class DequeuedQueue extends SimpleAMQQueue<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+
+ public DequeuedQueue(final UUID id,
+ final String queueName,
+ final boolean durable,
+ final String owner,
+ final boolean autoDelete,
+ final boolean exclusive,
+ final VirtualHost virtualHost,
+ final Map<String, Object> arguments)
+ {
+ super(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new DequeuedQueueEntryListFactory(), arguments);
+ }
+ }
+ private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+ public DequeuedQueueEntryList createQueueEntryList(DequeuedQueue queue)
+ {
+ /**
+ * Override SimpleQueueEntryList to create a dequeued
+ * entries for messages with even id
+ */
+ return new DequeuedQueueEntryList(queue);
+ }
+
+
+ }
+
+ private static class DequeuedQueueEntryList extends OrderedQueueEntryList<DequeuedQueueEntry, DequeuedQueue, DequeuedQueueEntryList>
+ {
+ private static final HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList> HEAD_CREATOR =
+ new HeadCreator<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList>()
+ {
+
+ @Override
+ public DequeuedQueueEntry createHead(final DequeuedQueueEntryList list)
+ {
+ return new DequeuedQueueEntry(list);
+ }
+ };
+
+ public DequeuedQueueEntryList(final DequeuedQueue queue)
+ {
+ super(queue, HEAD_CREATOR);
+ }
+
+ /**
+ * Entries with even message id are considered
+ * dequeued!
+ */
+ protected DequeuedQueueEntry createQueueEntry(final ServerMessage message)
+ {
+ return new DequeuedQueueEntry(this, message);
+ }
+
+
+ }
+
+ private static class DequeuedQueueEntry extends OrderedQueueEntry<DequeuedQueueEntry,DequeuedQueue,DequeuedQueueEntryList>
+ {
+
+ private final ServerMessage _message;
+
+ private DequeuedQueueEntry(final DequeuedQueueEntryList queueEntryList)
+ {
+ super(queueEntryList);
+ _message = null;
+ }
+
+ public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
+ {
+ super(list, message);
+ _message = message;
+ }
+
+ public boolean isDeleted()
+ {
+ return (_message.getMessageNumber() % 2 == 0);
+ }
+
+ public boolean isAvailable()
+ {
+ return !(_message.getMessageNumber() % 2 == 0);
+ }
+
+ @Override
+ public boolean acquire(QueueConsumer sub)
+ {
+ if(_message.getMessageNumber() % 2 == 0)
+ {
+ return false;
+ }
+ else
+ {
+ return super.acquire(sub);
+ }
+ }
+ }
+}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 11b9bbe1b4..993e9ee4a8 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -24,9 +24,9 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -385,7 +385,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
- queueEntries.add(new MockQueueEntry()
+ queueEntries.add(new MockMessageInstance()
{
@Override
@@ -395,7 +395,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
}
@Override
- public AMQQueue getQueue()
+ public TransactionLogResource getOwningResource()
{
return queue;
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index 80e794e0ff..bdfdb55c7e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -24,9 +24,9 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -609,7 +609,7 @@ public class LocalTransactionTest extends QpidTestCase
final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
- queueEntries.add(new MockQueueEntry()
+ queueEntries.add(new MockMessageInstance()
{
@Override
@@ -619,7 +619,7 @@ public class LocalTransactionTest extends QpidTestCase
}
@Override
- public AMQQueue getQueue()
+ public TransactionLogResource getOwningResource()
{
return queue;
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index cb1fc2737d..ed1ea01108 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -44,8 +44,8 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -179,9 +179,9 @@ public class BrokerTestHelper
return factory.createExchange("amp.direct", "direct", false, false);
}
- public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
+ public static AMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
{
- SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
+ AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
false, false, false, Collections.<String, Object>emptyMap());
return queue;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 53022c333e..87a02b99c1 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -928,10 +928,10 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
- private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index aa465d373f..162951f9ef 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1189,14 +1189,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
public ImmediateAction()
{
}
- public void performAction(MessageInstance<C> entry)
+ public void performAction(MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
@@ -1261,10 +1261,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index 281f7345ff..8aa25e8eb5 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -36,7 +36,7 @@ import java.util.List;
public class AcknowledgeTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -79,7 +79,7 @@ public class AcknowledgeTest extends QpidTestCase
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index e895f81c44..0a4bfd13f1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -26,10 +26,8 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -39,7 +37,7 @@ import java.util.List;
public class QueueBrowserUsesNoAckTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
diff --git a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
index 4d85d52997..0329379713 100644
--- a/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
+++ b/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
@@ -19,9 +19,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
index b408ad8ad1..b5a4166b55 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
@@ -64,7 +64,7 @@ public class FieldTableKeyEnumeratorTest extends TestCase
}
- public void testPropertEnu()
+ public void testPropertyEnum()
{
try
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index f92a133919..de36c6e413 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -47,11 +47,10 @@ import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.StandardQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -573,9 +572,9 @@ public class MessageStoreTest extends QpidTestCase
if (usePriority)
{
- assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass());
assertEquals("Priority Queue does not have set priorities",
- DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
+ DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities());
}
else if (lastValueQueue)
{
@@ -584,7 +583,7 @@ public class MessageStoreTest extends QpidTestCase
}
else
{
- assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
+ assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
}
assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
diff --git a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 2d6943f643..3c15a45203 100644
--- a/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -26,11 +26,9 @@ import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.NotificationCheckTest;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueueTest;
+import org.apache.qpid.server.queue.StandardQueue;
import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -661,7 +659,7 @@ public class QueueManagementTest extends QpidBrokerTestCase
final Object messageGroupKey = "test";
final Map<String, Object> arguments = new HashMap<String, Object>(2);
arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey);
- arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueue.SHARED_MSG_GROUP_ARG_VALUE);
managedBroker.createNewQueue(queueName, null, true, arguments);
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);