diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 00:26:35 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-05 00:26:35 +0000 |
commit | bd6a93bc29c3b4f8d2ed572f46e020a541feba9e (patch) | |
tree | 1c553b028698d83f0a9efee7d6fcfe6bb905eafd | |
parent | 3878f34525cde97ad35f27caf81adf41ab325730 (diff) | |
download | qpid-python-bd6a93bc29c3b4f8d2ed572f46e020a541feba9e.tar.gz |
define Subscription and SubscriptionTarget in terms of MessageInstance rather than QueueEntry
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564581 13f79535-47bb-0310-9956-ffa450edef68
43 files changed, 449 insertions, 445 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index e543cfb719..8fb011152c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -73,7 +73,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction output.writeInt(records.length); for(Transaction.Record record : records) { - UUID id = record.getQueue().getId(); + UUID id = record.getResource().getId(); output.writeLong(id.getMostSignificantBits()); output.writeLong(id.getLeastSignificantBits()); output.writeLong(record.getMessage().getMessageNumber()); @@ -93,7 +93,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction _queueId = queueId; } - public TransactionLogResource getQueue() + public TransactionLogResource getResource() { return this; } @@ -119,9 +119,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } @Override + public String getName() + { + return _queueId.toString(); + } + + @Override public UUID getId() { return _queueId; } + + @Override + public boolean isDurable() + { + return true; + } } } diff --git a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 0f4cdf1078..cecd39381e 100644 --- a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -416,10 +416,22 @@ public class BDBMessageStoreTest extends MessageStoreTest TransactionLogResource mockQueue = new TransactionLogResource() { @Override + public String getName() + { + return getId().toString(); + } + + @Override public UUID getId() { return mockQueueId; } + + @Override + public boolean isDurable() + { + return true; + } }; Transaction txn = log.newTransaction(); @@ -454,10 +466,22 @@ public class BDBMessageStoreTest extends MessageStoreTest TransactionLogResource mockQueue = new TransactionLogResource() { @Override + public String getName() + { + return getId().toString(); + } + + @Override public UUID getId() { return mockQueueId; } + + @Override + public boolean isDurable() + { + return true; + } }; Transaction txn = log.newTransaction(); @@ -511,10 +535,22 @@ public class BDBMessageStoreTest extends MessageStoreTest TransactionLogResource mockQueue = new TransactionLogResource() { @Override + public String getName() + { + return getId().toString(); + } + + @Override public UUID getId() { return mockQueueId; } + + @Override + public boolean isDurable() + { + return true; + } }; Transaction txn = log.newTransaction(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 39303a0715..80fa93c417 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -22,12 +22,58 @@ package org.apache.qpid.server.message; import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; public interface MessageInstance { + /** + * Number of times this queue entry has been delivered. + * + * @return delivery count + */ + int getDeliveryCount(); + + void incrementDeliveryCount(); + + void decrementDeliveryCount(); + + void addStateChangeListener(StateChangeListener<QueueEntry, State> listener); + + boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener); + + boolean acquiredBySubscription(); + + boolean isAcquiredBy(Subscription subscription); + + void setRedelivered(); + + boolean isRedelivered(); + + Subscription getDeliveredSubscription(); + + void reject(); + + boolean isRejectedBy(Subscription subscription); + + boolean getDeliveredToConsumer(); + + boolean expired() throws AMQException; + + boolean acquire(Subscription sub); + + int getMaximumDeliveryCount(); + + int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn); + + Filterable asFilterable(); public static enum State { @@ -164,4 +210,6 @@ public interface MessageInstance ServerMessage getMessage(); InstanceProperties getInstanceProperties(); + + TransactionLogResource getOwningResource(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 5dd1198ba8..61f38f963c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -35,48 +35,11 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> long getSize(); - boolean getDeliveredToConsumer(); - - boolean expired() throws AMQException; - - boolean acquire(Subscription sub); - - boolean acquiredBySubscription(); - boolean isAcquiredBy(Subscription subscription); - - void setRedelivered(); - - boolean isRedelivered(); - - Subscription getDeliveredSubscription(); - - void reject(); - - boolean isRejectedBy(Subscription subscription); - - int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn); - boolean isQueueDeleted(); QueueEntry getNextNode(); QueueEntry getNextValidEntry(); - void addStateChangeListener(StateChangeListener<QueueEntry, State> listener); - boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener); - - - /** - * Number of times this queue entry has been delivered. - * - * @return delivery count - */ - int getDeliveryCount(); - - void incrementDeliveryCount(); - - void decrementDeliveryCount(); - - Filterable asFilterable(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 18a5db597f..ea50337b61 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -456,6 +457,12 @@ public abstract class QueueEntryImpl implements QueueEntry return _deliveryCount; } + @Override + public int getMaximumDeliveryCount() + { + return getQueue().getMaximumDeliveryCount(); + } + public void incrementDeliveryCount() { _deliveryCountUpdater.incrementAndGet(this); @@ -491,6 +498,12 @@ public abstract class QueueEntryImpl implements QueueEntry return false; } + @Override + public TransactionLogResource getOwningResource() + { + return getQueue(); + } + private static class EntryInstanceProperties implements InstanceProperties { private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java index 08ecf58855..f683159e12 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; @@ -225,13 +226,13 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription } @Override - public boolean wouldSuspend(final QueueEntry msg) + public boolean wouldSuspend(final MessageInstance msg) { return !_target.allocateCredit(msg.getMessage()); } @Override - public void restoreCredit(final QueueEntry queueEntry) + public void restoreCredit(final MessageInstance queueEntry) { _target.restoreCredit(queueEntry.getMessage()); } @@ -298,9 +299,9 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription } @Override - public boolean resend(final QueueEntry entry) throws AMQException + public boolean resend(final MessageInstance entry) throws AMQException { - return getQueue().resend(entry, this); + return getQueue().resend((QueueEntry)entry, this); } final SubFlushRunner getRunner() @@ -353,7 +354,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription _noLocal = noLocal; } - public final boolean hasInterest(QueueEntry entry) + public final boolean hasInterest(MessageInstance entry) { //check that the message hasn't been rejected if (entry.isRejectedBy(this)) @@ -429,7 +430,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription return _createTime; } - public final QueueEntry.SubscriptionAcquiredState getOwningState() + public final MessageInstance.SubscriptionAcquiredState getOwningState() { return _owningState; } @@ -464,7 +465,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription return _deliveredCount.longValue(); } - public final void send(final QueueEntry entry, final boolean batch) throws AMQException + public final void send(final MessageInstance entry, final boolean batch) throws AMQException { _deliveredCount.incrementAndGet(); _deliveredBytes.addAndGet(entry.getMessage().getSize()); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e490ac38c1..97549a3c1e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -1065,7 +1065,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt.setString(4, "E"); for(Transaction.Record record : enqueues) { - stmt.setString(5, record.getQueue().getId().toString()); + stmt.setString(5, record.getResource().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -1076,7 +1076,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC stmt.setString(4, "D"); for(Transaction.Record record : dequeues) { - stmt.setString(5, record.getQueue().getId().toString()); + stmt.setString(5, record.getResource().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -1371,7 +1371,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override - public TransactionLogResource getQueue() + public TransactionLogResource getResource() { return this; } @@ -1401,10 +1401,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } @Override + public String getName() + { + return _queueId.toString(); + } + + @Override public UUID getId() { return _queueId; } + + @Override + public boolean isDurable() + { + return true; + } } protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 66bcfff32b..74b91dec2d 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -70,7 +70,7 @@ public interface Transaction public static interface Record { - TransactionLogResource getQueue(); + TransactionLogResource getResource(); EnqueueableMessage getMessage(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 576dca847d..18b3125641 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -24,5 +24,7 @@ import java.util.UUID; public interface TransactionLogResource { + String getName(); public UUID getId(); + boolean isDurable(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java index e90502f023..572b076ba2 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -23,9 +23,8 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.util.StateChangeListener; public interface Subscription @@ -64,7 +63,7 @@ public interface Subscription AMQSessionModel getSessionModel(); - QueueEntry.SubscriptionAcquiredState getOwningState(); + MessageInstance.SubscriptionAcquiredState getOwningState(); void setNoLocal(boolean noLocal); @@ -72,7 +71,7 @@ public interface Subscription boolean isSuspended(); - boolean hasInterest(QueueEntry msg); + boolean hasInterest(MessageInstance msg); boolean isClosed(); @@ -82,16 +81,16 @@ public interface Subscription void close() throws AMQException; - void send(QueueEntry entry, boolean batch) throws AMQException; + void send(MessageInstance entry, boolean batch) throws AMQException; - boolean resend(QueueEntry entry) throws AMQException; + boolean resend(MessageInstance entry) throws AMQException; void flushBatched(); void queueDeleted(); - boolean wouldSuspend(QueueEntry msg); + boolean wouldSuspend(MessageInstance msg); boolean trySendLock(); @@ -100,7 +99,7 @@ public interface Subscription void releaseSendLock(); - void restoreCredit(final QueueEntry queueEntry); + void restoreCredit(final MessageInstance queueEntry); void setStateListener(final StateChangeListener<? extends Subscription, State> listener); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java index f1d77d9d42..80298cccc8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java @@ -21,9 +21,9 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.util.StateChangeListener; public interface SubscriptionTarget @@ -49,7 +49,7 @@ public interface SubscriptionTarget AMQSessionModel getSessionModel(); - void send(QueueEntry entry, boolean batch) throws AMQException; + void send(MessageInstance entry, boolean batch) throws AMQException; void flushBatched(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 1f5a4907ed..03717ed6ae 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -25,12 +25,14 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import java.util.Collection; import java.util.List; @@ -88,7 +90,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try @@ -158,15 +160,15 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) + public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction) { Transaction txn = null; try { - for(QueueEntry entry : queueEntries) + for(MessageInstance entry : queueEntries) { ServerMessage message = entry.getMessage(); - BaseQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if(message.isPersistent() && queue.isDurable()) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index b057998456..b9d91a647b 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -25,11 +25,13 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import java.util.Collection; import java.util.List; @@ -73,7 +75,7 @@ public class AutoCommitTransaction implements ServerTransaction immediateAction.postCommit(); } - public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { Transaction txn = null; try @@ -105,15 +107,15 @@ public class AutoCommitTransaction implements ServerTransaction } - public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) + public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction) { Transaction txn = null; try { - for(QueueEntry entry : queueEntries) + for(MessageInstance entry : queueEntries) { ServerMessage message = entry.getMessage(); - BaseQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if(message.isPersistent() && queue.isDurable()) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index d48b09d912..238facf4b5 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -22,10 +22,12 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -74,7 +76,7 @@ public class DistributedTransaction implements ServerTransaction } } - public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { if(_branch != null) { @@ -87,13 +89,13 @@ public class DistributedTransaction implements ServerTransaction } } - public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction) + public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction) { if(_branch != null) { - for(QueueEntry entry : messages) + for(MessageInstance entry : messages) { - _branch.dequeue(entry.getQueue(), entry.getMessage()); + _branch.dequeue(entry.getOwningResource(), entry.getMessage()); } _branch.addPostTransactionAction(postTransactionAction); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 6b12862690..ada4eeb553 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Xid; @@ -335,7 +336,7 @@ public class DtxBranch { if(enqueue.isDurable()) { - _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage()); + _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage()); } } @@ -344,7 +345,7 @@ public class DtxBranch { if(enqueue.isDurable()) { - _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage()); + _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage()); } } } @@ -356,9 +357,9 @@ public class DtxBranch } - public void dequeue(BaseQueue queue, EnqueueableMessage message) + public void dequeue(TransactionLogResource resource, EnqueueableMessage message) { - _dequeueRecords.add(new Record(queue, message)); + _dequeueRecords.add(new Record(resource, message)); } @@ -369,18 +370,18 @@ public class DtxBranch private static final class Record implements Transaction.Record { - private final BaseQueue _queue; + private final TransactionLogResource _resource; private final EnqueueableMessage _message; - public Record(BaseQueue queue, EnqueueableMessage message) + public Record(TransactionLogResource resource, EnqueueableMessage message) { - _queue = queue; + _resource = resource; _message = message; } - public BaseQueue getQueue() + public TransactionLogResource getResource() { - return _queue; + return _resource; } public EnqueueableMessage getMessage() @@ -390,7 +391,7 @@ public class DtxBranch public boolean isDurable() { - return _message.isPersistent() && _queue.isDurable(); + return _message.isPersistent() && _resource.isDurable(); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 81aabc6bd3..93482153d3 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.txn; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +93,7 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); } - public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); @@ -118,7 +120,7 @@ public class LocalTransaction implements ServerTransaction } } - public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) + public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); @@ -126,10 +128,10 @@ public class LocalTransaction implements ServerTransaction try { - for(QueueEntry entry : queueEntries) + for(MessageInstance entry : queueEntries) { ServerMessage message = entry.getMessage(); - BaseQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if(message.isPersistent() && queue.isDurable()) { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index 240ad154ba..3355a7ed06 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -24,8 +24,9 @@ import java.util.Collection; import java.util.List; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLogResource; /** @@ -79,14 +80,14 @@ public interface ServerTransaction * * A store operation will result only for a persistent message on a durable queue. */ - void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction); + void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction); /** * Dequeue a message(s) from queue(s) registering a post transaction action. * * Store operations will result only for a persistent messages on durable queues. */ - void dequeue(Collection<QueueEntry> messages, Action postTransactionAction); + void dequeue(Collection<MessageInstance> messages, Action postTransactionAction); /** * Enqueue a message to a queue registering a post transaction action. diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index b01f1d1ebc..f82490ee79 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements } for(Transaction.Record record : enqueues) { - final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -173,13 +173,13 @@ public class VirtualHostConfigRecoveryHandler implements StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getId().toString())); + record.getResource().getId().toString())); } } for(Transaction.Record record : dequeues) { - final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -223,7 +223,7 @@ public class VirtualHostConfigRecoveryHandler implements StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getId().toString())); + record.getResource().getId().toString())); } } @@ -312,10 +312,22 @@ public class VirtualHostConfigRecoveryHandler implements new TransactionLogResource() { @Override + public String getName() + { + return "<<UNKNOWN>>"; + } + + @Override public UUID getId() { return queueId; } + + @Override + public boolean isDurable() + { + return false; + } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); txn.commitTranAsync(); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 1947a4e3b6..0d38b7002a 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -25,6 +25,7 @@ import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; @@ -68,7 +69,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); - ArrayList<QueueEntry> msgs = getSubscription().getMessages(); + ArrayList<MessageInstance> msgs = getSubscription().getMessages(); try { assertEquals(1L, msgs.get(0).getMessage().getMessageNumber()); @@ -87,7 +88,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest { // Show message order on failure. int index = 1; - for (QueueEntry qe : msgs) + for (MessageInstance qe : msgs) { System.err.println(index + ":" + qe.getMessage().getMessageNumber()); index++; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index dbc04bc0cd..e74bc992a2 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -45,6 +46,12 @@ public class MockQueueEntry implements QueueEntry return false; } + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + public boolean acquiredBySubscription() { return false; @@ -225,4 +232,10 @@ public class MockQueueEntry implements QueueEntry { return InstanceProperties.EMPTY; } + + @Override + public TransactionLogResource getOwningResource() + { + return getQueue(); + } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index db39eb80a9..51ae822b2e 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -206,7 +207,8 @@ public class SimpleAMQQueueTest extends QpidTestCase EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", _subscription.getQueueContext().getReleasedEntry()); + assertNull("There should be no releasedEntry after an enqueue", + _subscription.getQueueContext().getReleasedEntry()); } /** @@ -222,7 +224,8 @@ public class SimpleAMQQueueTest extends QpidTestCase EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", _subscription.getQueueContext().getReleasedEntry()); + assertNull("There should be no releasedEntry after enqueues", + _subscription.getQueueContext().getReleasedEntry()); } /** @@ -273,7 +276,8 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _subscription.getQueueContext().getReleasedEntry()); } /** @@ -321,7 +325,8 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _subscription.getQueueContext().getReleasedEntry()); } @@ -375,7 +380,8 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + _subscription.getQueueContext().getReleasedEntry()); } @@ -430,8 +436,10 @@ public class SimpleAMQQueueTest extends QpidTestCase assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, target1.getMessages().size() + target2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", subscription1.getQueueContext().getReleasedEntry()); - assertNull("releasedEntry should be cleared after requeue processed", subscription2.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + subscription1.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + subscription2.getQueueContext().getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException @@ -726,9 +734,9 @@ public class SimpleAMQQueueTest extends QpidTestCase }); // check expected messages delivered to correct consumers - verifyReceivedMessages(Arrays.asList(msg1,msg2,msg3), sub1.getMessages()); - verifyReceivedMessages(Collections.singletonList(msg4), sub2.getMessages()); - verifyReceivedMessages(Collections.singletonList(msg5), sub3.getMessages()); + verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); } /** @@ -919,7 +927,7 @@ public class SimpleAMQQueueTest extends QpidTestCase * @param entry * @param batch */ - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { super.send(entry, batch); latch.countDown(); @@ -953,7 +961,7 @@ public class SimpleAMQQueueTest extends QpidTestCase { Thread.currentThread().interrupt(); } - List<QueueEntry> expected = Arrays.asList(entries.get(0), entries.get(2), entries.get(3)); + List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3)); verifyReceivedMessages(expected, subscription.getMessages()); } @@ -1027,7 +1035,7 @@ public class SimpleAMQQueueTest extends QpidTestCase putGivenNumberOfMessages(queue, 4); // assert received messages - List<QueueEntry> messages = subscription.getMessages(); + List<MessageInstance> messages = subscription.getMessages(); assertEquals("Only 2 messages should be returned", 2, messages.size()); assertEquals("ID of first message should be 1", 1l, (messages.get(0).getMessage()).getMessageNumber()); @@ -1222,13 +1230,13 @@ public class SimpleAMQQueueTest extends QpidTestCase return entriesList; } - private void verifyReceivedMessages(List<QueueEntry> expected, - List<QueueEntry> delivered) + private void verifyReceivedMessages(List<MessageInstance> expected, + List<MessageInstance> delivered) { assertEquals("Consumer did not receive the expected number of messages", expected.size(), delivered.size()); - for (QueueEntry msg : expected) + for (MessageInstance msg : expected) { assertTrue("Consumer did not receive msg: " + msg.getMessage().getMessageNumber(), delivered.contains(msg)); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 3dfe057285..d3ee938586 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -464,7 +464,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest } @Override - public TransactionLogResource getQueue() + public TransactionLogResource getResource() { return _queue; } @@ -505,7 +505,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { return false; } - if (_queue == null && other.getQueue() != null) + if (_queue == null && other.getResource() != null) { return false; } @@ -513,7 +513,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { return false; } - return _queue.getId().equals(other.getQueue().getId()); + return _queue.getId().equals(other.getResource().getId()); } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 121c380736..7a4f92f0ca 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -154,6 +154,12 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple return _transactionResource; } + @Override + public boolean isDurable() + { + return true; + } + private static class TestMessage implements EnqueueableMessage { private final StoredMessage<?> _handle; diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index cc5235a420..2ee72b9a36 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; @@ -54,7 +55,7 @@ public class MockSubscription implements SubscriptionTarget private AMQQueue queue = null; private StateChangeListener<SubscriptionTarget, State> _listener = null; private State _state = State.ACTIVE; - private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>(); + private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>(); private final Lock _stateChangeLock = new ReentrantLock(); private static final AtomicLong idGenerator = new AtomicLong(0); @@ -156,7 +157,7 @@ public class MockSubscription implements SubscriptionTarget { } - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { if (messages.contains(entry)) { @@ -203,7 +204,7 @@ public class MockSubscription implements SubscriptionTarget _listener = listener; } - public ArrayList<QueueEntry> getMessages() + public ArrayList<MessageInstance> getMessages() { return messages; } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 3c66a4c94b..11b9bbe1b4 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.txn; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; @@ -47,7 +48,7 @@ public class AutoCommitTransactionTest extends QpidTestCase private MessageStore _transactionLog; private AMQQueue _queue; private List<AMQQueue> _queues; - private Collection<QueueEntry> _queueEntries; + private Collection<MessageInstance> _queueEntries; private ServerMessage _message; private MockAction _action; private MockStoreTransaction _storeTransaction; @@ -373,9 +374,9 @@ public class AutoCommitTransactionTest extends QpidTestCase assertFalse("Rollback action must be fired", _action.isRollbackActionFired()); } - private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) + private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) { - Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>(); assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index f3f5e00346..80e794e0ff 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.txn; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; @@ -46,7 +47,7 @@ public class LocalTransactionTest extends QpidTestCase private AMQQueue _queue; private List<AMQQueue> _queues; - private Collection<QueueEntry> _queueEntries; + private Collection<MessageInstance> _queueEntries; private ServerMessage _message; private MockAction _action1; private MockAction _action2; @@ -597,9 +598,9 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime()); } - private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) + private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) { - Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); + Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>(); assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 04510064de..b7788911c7 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; @@ -30,10 +31,10 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; + private final MessageInstance _entry; private final SubscriptionTarget_0_10 _target; - public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target) { _entry = entry; _target = target; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index 0cdced728a..0c238c4d55 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -29,10 +30,10 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; + private final MessageInstance _entry; private SubscriptionTarget_0_10 _target; - public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target) { _entry = entry; _target = target; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index 9b5272d413..71ad60c7b8 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -21,17 +21,18 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.transport.Method; public class MessageAcceptCompletionListener implements Method.CompletionListener { private final SubscriptionTarget_0_10 _sub; - private final QueueEntry _entry; + private final MessageInstance _entry; private final ServerSession _session; private boolean _restoreCredit; - public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) + public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) { super(); _sub = sub; diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 098ddc2fae..73263bd931 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -55,6 +55,7 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -399,9 +400,9 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry) + public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry) { - _transaction.dequeue(entry.getQueue(), entry.getMessage(), + _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java index aee7acc99b..304b32b250 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java @@ -27,11 +27,13 @@ import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -193,7 +195,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen private final AddMessageDispositionListenerAction _postIdSettingAction; - public void send(final QueueEntry entry, boolean batch) throws AMQException + public void send(final MessageInstance entry, boolean batch) throws AMQException { ServerMessage serverMsg = entry.getMessage(); @@ -275,7 +277,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen { public void onComplete(Method method) { - deferredAddCredit(1, entry.getSize()); + deferredAddCredit(1, entry.getMessage().getSize()); } }); } @@ -309,10 +311,10 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } - void recordUnacknowledged(QueueEntry entry) + void recordUnacknowledged(MessageInstance entry) { _unacknowledgedCount.incrementAndGet(); - _unacknowledgedBytes.addAndGet(entry.getSize()); + _unacknowledgedBytes.addAndGet(entry.getMessage().getSize()); } private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) @@ -334,10 +336,10 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } } - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) + private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) { AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), + dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { public void postCommit() @@ -356,7 +358,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen }); } - void reject(final QueueEntry entry) + void reject(final MessageInstance entry) { entry.setRedelivered(); entry.routeToAlternate(null, null); @@ -366,7 +368,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } } - void release(final QueueEntry entry, final boolean setRedelivered) + void release(final MessageInstance entry, final boolean setRedelivered) { if (setRedelivered) { @@ -388,7 +390,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen } } - protected void sendToDLQOrDiscard(QueueEntry entry) + protected void sendToDLQOrDiscard(MessageInstance entry) { final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); @@ -405,26 +407,30 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen if (requeues == 0) { - final AMQQueue queue = entry.getQueue(); - final Exchange alternateExchange = queue.getAlternateExchange(); - - if(alternateExchange != null) - { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), - alternateExchange.getName())); - } - else + TransactionLogResource owningResource = entry.getOwningResource(); + if(owningResource instanceof AMQQueue) { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), - queue.getName(), - msg.getRoutingKey())); + final AMQQueue queue = (AMQQueue)owningResource; + final Exchange alternateExchange = queue.getAlternateExchange(); + + if(alternateExchange != null) + { + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); + } + else + { + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); + } } } } - private boolean isMaxDeliveryLimitReached(QueueEntry entry) + private boolean isMaxDeliveryLimitReached(MessageInstance entry) { - final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); + final int maxDeliveryLimit = entry.getMaximumDeliveryCount(); return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); } @@ -519,12 +525,12 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen return _stopped.get(); } - public void acknowledge(QueueEntry entry) + public void acknowledge(MessageInstance entry) { // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(getSubscription())) { - _unacknowledgedBytes.addAndGet(-entry.getSize()); + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); _unacknowledgedCount.decrementAndGet(); entry.delete(); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 516d6e48ff..ce1a132973 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; @@ -67,7 +68,9 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -143,7 +146,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private volatile boolean _rollingBack; private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; - private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); + private List<MessageInstance> _resendList = new ArrayList<MessageInstance>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); private long _createTime = System.currentTimeMillis(); @@ -673,22 +676,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * delivery tag) * @param subscription The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription) { if (_logger.isDebugEnabled()) { - if (entry.getQueue() == null) - { - _logger.debug("Adding unacked message with a null queue:" + entry); - } - else - { - if (_logger.isDebugEnabled()) - { _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag - + ") with a queue(" + entry.getQueue() + ") for " + subscription); - } - } + + ") for " + subscription); + } _unacknowledgedMessageMap.add(deliveryTag, entry); @@ -711,7 +705,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); if (!messagesToBeDelivered.isEmpty()) { @@ -722,21 +716,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - for (QueueEntry unacked : messagesToBeDelivered) + for (MessageInstance unacked : messagesToBeDelivered) { - if (!unacked.isQueueDeleted()) - { - // Mark message redelivered - unacked.setRedelivered(); - - // Ensure message is released for redelivery - unacked.release(); + // Mark message redelivered + unacked.setRedelivered(); - } - else - { - unacked.delete(); - } + // Ensure message is released for redelivery + unacked.release(); } } @@ -750,7 +736,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ public void requeue(long deliveryTag) throws AMQException { - QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); + MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { @@ -758,20 +744,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F unacked.setRedelivered(); // Ensure message is released for redelivery - if (!unacked.isQueueDeleted()) - { - - // Ensure message is released for redelivery - unacked.release(); + unacked.release(); - } - else - { - _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked - + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - - unacked.delete(); - } } else { @@ -784,10 +758,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean isMaxDeliveryCountEnabled(final long deliveryTag) { - final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { - final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); return maximumDeliveryCount > 0; } @@ -796,10 +770,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean isDeliveredTooManyTimes(final long deliveryTag) { - final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { - final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); final int numDeliveries = queueEntry.getDeliveryCount(); return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; } @@ -818,8 +792,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); if (_logger.isDebugEnabled()) { @@ -831,9 +805,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, - requeue, - _messageStore)); + msgToResend + )); // Process Messages to Resend @@ -849,9 +822,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + MessageInstance message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. @@ -877,9 +850,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } // Process Messages to Requeue at the front of the queue - for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet()) { - QueueEntry message = entry.getValue(); + MessageInstance message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. @@ -905,11 +878,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); + Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); } - private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) + private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple) { return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); @@ -1077,7 +1050,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F postRollbackTask.run(); - for(QueueEntry entry : _resendList) + for(MessageInstance entry : _resendList) { Subscription sub = entry.getDeliveredSubscription(); if(sub == null || sub.isClosed()) @@ -1152,7 +1125,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1288,9 +1261,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private class MessageAcknowledgeAction implements ServerTransaction.Action { - private final Collection<QueueEntry> _ackedMessages; + private final Collection<MessageInstance> _ackedMessages; - public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) + public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages) { _ackedMessages = ackedMessages; } @@ -1299,7 +1272,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - for(QueueEntry entry : _ackedMessages) + for(MessageInstance entry : _ackedMessages) { entry.delete(); } @@ -1322,7 +1295,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - for(QueueEntry entry : _ackedMessages) + for(MessageInstance entry : _ackedMessages) { entry.release(); } @@ -1490,7 +1463,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); - final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag); + final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); if (rejectedQueueEntry == null) { @@ -1499,6 +1472,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F else { final ServerMessage msg = rejectedQueueEntry.getMessage(); + final Subscription sub = rejectedQueueEntry.getDeliveredSubscription(); int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>() { @@ -1512,23 +1486,28 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(requeues == 0) { - final AMQQueue queue = rejectedQueueEntry.getQueue(); - final Exchange altExchange = queue.getAlternateExchange(); - - if (altExchange == null) + final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource(); + if(owningResource instanceof AMQQueue) { - _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + final AMQQueue queue = (AMQQueue) owningResource; - } - else - { - _logger.debug( - "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " - + deliveryTag); - _actor.message(_logSubject, - ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + final Exchange altExchange = queue.getAlternateExchange(); + + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + + } + else + { + _logger.debug( + "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + + deliveryTag); + _actor.message(_logSubject, + ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + } } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 060aebdd65..06c1d79439 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription; @@ -35,26 +36,20 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class); - private final Map<Long, QueueEntry> _msgToRequeue; - private final Map<Long, QueueEntry> _msgToResend; - private final boolean _requeueIfUnableToResend; + private final Map<Long, MessageInstance> _msgToRequeue; + private final Map<Long, MessageInstance> _msgToResend; private final UnacknowledgedMessageMap _unacknowledgedMessageMap; - private final MessageStore _transactionLog; public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap, - Map<Long, QueueEntry> msgToRequeue, - Map<Long, QueueEntry> msgToResend, - boolean requeueIfUnableToResend, - MessageStore txnLog) + Map<Long, MessageInstance> msgToRequeue, + Map<Long, MessageInstance> msgToResend) { _unacknowledgedMessageMap = unacknowledgedMessageMap; _msgToRequeue = msgToRequeue; _msgToResend = msgToResend; - _requeueIfUnableToResend = requeueIfUnableToResend; - _transactionLog = txnLog; } - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException { message.setRedelivered(); @@ -73,58 +68,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (!message.isQueueDeleted()) - { - if (_requeueIfUnableToResend) - { - _msgToRequeue.put(deliveryTag, message); - } - else - { - - dequeueEntry(message); - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); - } - } - else - { - dequeueEntry(message); - _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); - } + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); } // false means continue processing return false; } - - private void dequeueEntry(final QueueEntry node) - { - ServerTransaction txn = new AutoCommitTransaction(_transactionLog); - dequeueEntry(node, txn); - } - - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) - { - txn.dequeue(node.getQueue(), node.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - node.delete(); - } - - public void onRollback() - { - - } - }); - } - public void visitComplete() { _unacknowledgedMessageMap.clear(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 5ba0a2f893..89a5aa55c1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.subscription.Subscription; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag); + void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java index a9d131a8d2..9a6cec87bd 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -102,7 +103,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget * @throws org.apache.qpid.AMQException */ @Override - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -165,7 +166,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget * @throws org.apache.qpid.AMQException */ @Override - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -176,7 +177,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - _txn.dequeue(entry.getQueue(), entry.getMessage(), NOOP); + _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP); ServerMessage message = entry.getMessage(); MessageReference ref = message.newReference(); @@ -281,7 +282,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget * @throws org.apache.qpid.AMQException */ @Override - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { @@ -492,7 +493,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget } - protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) + protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag) { _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag); } @@ -520,9 +521,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget _channel.getProtocolSession().flushBatched(); } - protected void addUnacknowledgedMessage(QueueEntry entry) + protected void addUnacknowledgedMessage(MessageInstance entry) { - final long size = entry.getSize(); + final long size = entry.getMessage().getSize(); _unacknowledgedBytes.addAndGet(size); _unacknowledgedCount.incrementAndGet(); entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java index 1d41bcdcf4..fcbbadd507 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import java.util.Collection; @@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMap *@param message the message being iterated over @return true to stop iteration, false to continue * @throws AMQException */ - boolean callback(final long deliveryTag, QueueEntry message) throws AMQException; + boolean callback(final long deliveryTag, MessageInstance message) throws AMQException; void visitComplete(); } void visit(Visitor visitor) throws AMQException; - void add(long deliveryTag, QueueEntry message); + void add(long deliveryTag, MessageInstance message); - QueueEntry remove(long deliveryTag); + MessageInstance remove(long deliveryTag); - Collection<QueueEntry> cancelAllMessages(); + Collection<MessageInstance> cancelAllMessages(); int size(); void clear(); - QueueEntry get(long deliveryTag); + MessageInstance get(long deliveryTag); /** * Get the set of delivery tags that are outstanding. @@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMap */ Set<Long> getDeliveryTags(); - Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple); + Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 17b2c7b985..8d70e769d3 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import java.util.Collection; @@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap private long _unackedSize; - private Map<Long, QueueEntry> _map; + private Map<Long, MessageInstance> _map; private long _lastDeliveryTag; @@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit); + _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit); } - public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs) + public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs) { if (multiple) { @@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } else { - final QueueEntry entry = get(deliveryTag); + final MessageInstance entry = get(deliveryTag); if(entry != null) { msgs.put(deliveryTag, entry); @@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } - public void remove(Map<Long,QueueEntry> msgs) + public void remove(Map<Long,MessageInstance> msgs) { synchronized (_lock) { @@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public QueueEntry remove(long deliveryTag) + public MessageInstance remove(long deliveryTag) { synchronized (_lock) { - QueueEntry message = _map.remove(deliveryTag); + MessageInstance message = _map.remove(deliveryTag); if(message != null) { _unackedSize -= message.getMessage().getSize(); @@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet(); - for (Map.Entry<Long, QueueEntry> entry : currentEntries) + Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet(); + for (Map.Entry<Long, MessageInstance> entry : currentEntries) { visitor.callback(entry.getKey().longValue(), entry.getValue()); } @@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void add(long deliveryTag, QueueEntry message) + public void add(long deliveryTag, MessageInstance message) { synchronized (_lock) { @@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public Collection<QueueEntry> cancelAllMessages() + public Collection<MessageInstance> cancelAllMessages() { synchronized (_lock) { - Collection<QueueEntry> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit); + Collection<MessageInstance> currentEntries = _map.values(); + _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit); _unackedSize = 0l; return currentEntries; } @@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public QueueEntry get(long key) + public MessageInstance get(long key) { synchronized (_lock) { @@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple) + public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple) { - Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); + Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>(); collect(deliveryTag, multiple, ackedMessageMap); remove(ackedMessageMap); return ackedMessageMap.values(); } - private void collect(long key, Map<Long, QueueEntry> msgs) + private void collect(long key, Map<Long, MessageInstance> msgs) { synchronized (_lock) { - for (Map.Entry<Long, QueueEntry> entry : _map.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : _map.entrySet()) { msgs.put(entry.getKey(),entry.getValue()); if (entry.getKey() == key) diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index e632fe02b4..53c0ae7381 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -30,6 +30,7 @@ import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; @@ -149,7 +150,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index 90f80a27e7..73559038f3 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; @@ -65,7 +66,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR long deliveryTag = body.getDeliveryTag(); - QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); if (message == null) { @@ -73,16 +74,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - if (message.isQueueDeleted()) - { - _logger.warn("Message's Queue has already been purged, dropping message"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - if(message != null) - { - message.delete(); - } - return; - } if (message.getMessage() == null) { @@ -100,11 +91,11 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (body.getRequeue()) { - channel.requeue(deliveryTag); - //this requeue represents a message rejected from the pre-dispatch queue //therefore we need to amend the delivery counter. message.decrementDeliveryCount(); + + channel.requeue(deliveryTag); } else { diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index c3514d3778..d542c82a5b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; @@ -194,8 +195,8 @@ public class AckTest extends QpidTestCase { assertTrue(deliveryTag == i); i++; - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); } } @@ -275,8 +276,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) { @@ -314,8 +315,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); ++i; } } @@ -346,8 +347,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); ++i; } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index a6e5f29964..9fa9e49c6b 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -63,7 +64,6 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue; - private MessageStore _messageStore = new TestMemoryMessageStore(); private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); private Subscription _subscription; private boolean _queueDeleted; @@ -141,12 +141,12 @@ public class ExtractResendAndRequeueTest extends TestCase //We don't need the subscription object here. acquireMessages(_referenceList); - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); // requeueIfUnableToResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); + msgToResend)); assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size()); assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); @@ -170,94 +170,17 @@ public class ExtractResendAndRequeueTest extends TestCase // Close subscription when(_subscription.isClosed()).thenReturn(true); - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); // requeueIfUnableToResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); + msgToResend)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); } - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued - * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map. - * - * @throws AMQException the visit interface throws this - */ - - public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnableToResend = true so all messages should go to msgToRequeue - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - } - - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that we don't - * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue. - * - * @throws AMQException the visit interface throws this - */ - - public void testDrop() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - - } - - /** - * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was - * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the - * future we may wish to dead letter the message. - * - * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted - * - * @throws AMQException the visit interface throws this - */ - public void testDiscard() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - _queueDeleted = true; - // requeueIfUnableToResend : value doesn't matter here as queue has been deleted - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 6cdd6f3119..390944ad3a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -65,6 +65,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; @@ -85,14 +86,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private SubscriptionTarget_1_0 _target; private boolean _draining; - private final Map<Binary, QueueEntry> _unsettledMap = - new HashMap<Binary, QueueEntry>(); + private final Map<Binary, MessageInstance> _unsettledMap = + new HashMap<Binary, MessageInstance>(); private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<Binary, UnsettledAction>(); private volatile SendingLinkAttachment _linkAttachment; private TerminusDurability _durability; - private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>(); + private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>(); private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); private Runnable _closeAction; private final AMQQueue _queue; @@ -559,7 +560,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } - public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry) + public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry) { _unsettledActionMap.put(tag,unsettledAction); if(getTransactionId() == null) @@ -631,14 +632,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS SendingLinkEndpoint endpoint = linkAttachment.getEndpoint(); endpoint.setDeliveryStateHandler(this); Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); - Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap); + Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap); _resumeAcceptedTransfers.clear(); _resumeFullTransfers.clear(); - for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet()) + for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet()) { Binary deliveryTag = entry.getKey(); - final QueueEntry queueEntry = entry.getValue(); + final MessageInstance queueEntry = entry.getValue(); if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag)) { queueEntry.setRedelivered(); @@ -706,9 +707,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public Map getUnsettledOutcomeMap() { - Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap); + Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap); - for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet()) + for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet()) { entry.setValue(null); } @@ -720,4 +721,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _closeAction = action; } + + public VirtualHost getVirtualHost() + { + return _vhost; + } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java index 14d9934c1d..a3793392d5 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java @@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -112,7 +113,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget } } - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { // TODO send(entry); @@ -123,7 +124,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget // TODO } - public void send(final QueueEntry queueEntry) throws AMQException + public void send(final MessageInstance queueEntry) throws AMQException { ServerMessage serverMessage = queueEntry.getMessage(); Message_1_0 message; @@ -134,7 +135,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget else { final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class); - message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost()); + message = (Message_1_0) converter.convert(serverMessage, _link.getVirtualHost()); } Transfer transfer = new Transfer(); @@ -344,10 +345,10 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget private class DispositionAction implements UnsettledAction { - private final QueueEntry _queueEntry; + private final MessageInstance _queueEntry; private final Binary _deliveryTag; - public DispositionAction(Binary tag, QueueEntry queueEntry) + public DispositionAction(Binary tag, MessageInstance queueEntry) { _deliveryTag = tag; _queueEntry = queueEntry; @@ -378,7 +379,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget if(outcome instanceof Accepted) { - txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(), + txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { @@ -469,7 +470,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget private class DoNothingAction implements UnsettledAction { public DoNothingAction(final Binary tag, - final QueueEntry queueEntry) + final MessageInstance queueEntry) { } |