summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-05 00:26:35 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-05 00:26:35 +0000
commitbd6a93bc29c3b4f8d2ed572f46e020a541feba9e (patch)
tree1c553b028698d83f0a9efee7d6fcfe6bb905eafd
parent3878f34525cde97ad35f27caf81adf41ab325730 (diff)
downloadqpid-python-bd6a93bc29c3b4f8d2ed572f46e020a541feba9e.tar.gz
define Subscription and SubscriptionTarget in terms of MessageInstance rather than QueueEntry
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564581 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java16
-rw-r--r--java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java36
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java48
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java37
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java13
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java15
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java18
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java15
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java4
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java10
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java10
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java21
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java10
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java7
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java20
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java5
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java13
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java40
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java7
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java7
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java5
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java5
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java56
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java133
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java64
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java15
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java13
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java37
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java3
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java17
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java17
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java91
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java24
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java15
43 files changed, 449 insertions, 445 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index e543cfb719..8fb011152c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -73,7 +73,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
output.writeInt(records.length);
for(Transaction.Record record : records)
{
- UUID id = record.getQueue().getId();
+ UUID id = record.getResource().getId();
output.writeLong(id.getMostSignificantBits());
output.writeLong(id.getLeastSignificantBits());
output.writeLong(record.getMessage().getMessageNumber());
@@ -93,7 +93,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
_queueId = queueId;
}
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return this;
}
@@ -119,9 +119,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
@Override
+ public String getName()
+ {
+ return _queueId.toString();
+ }
+
+ @Override
public UUID getId()
{
return _queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
}
}
diff --git a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 0f4cdf1078..cecd39381e 100644
--- a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -416,10 +416,22 @@ public class BDBMessageStoreTest extends MessageStoreTest
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -454,10 +466,22 @@ public class BDBMessageStoreTest extends MessageStoreTest
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -511,10 +535,22 @@ public class BDBMessageStoreTest extends MessageStoreTest
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index 39303a0715..80fa93c417 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -22,12 +22,58 @@ package org.apache.qpid.server.message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
public interface MessageInstance
{
+ /**
+ * Number of times this queue entry has been delivered.
+ *
+ * @return delivery count
+ */
+ int getDeliveryCount();
+
+ void incrementDeliveryCount();
+
+ void decrementDeliveryCount();
+
+ void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+
+ boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+
+ boolean acquiredBySubscription();
+
+ boolean isAcquiredBy(Subscription subscription);
+
+ void setRedelivered();
+
+ boolean isRedelivered();
+
+ Subscription getDeliveredSubscription();
+
+ void reject();
+
+ boolean isRejectedBy(Subscription subscription);
+
+ boolean getDeliveredToConsumer();
+
+ boolean expired() throws AMQException;
+
+ boolean acquire(Subscription sub);
+
+ int getMaximumDeliveryCount();
+
+ int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn);
+
+ Filterable asFilterable();
public static enum State
{
@@ -164,4 +210,6 @@ public interface MessageInstance
ServerMessage getMessage();
InstanceProperties getInstanceProperties();
+
+ TransactionLogResource getOwningResource();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 5dd1198ba8..61f38f963c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -35,48 +35,11 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
long getSize();
- boolean getDeliveredToConsumer();
-
- boolean expired() throws AMQException;
-
- boolean acquire(Subscription sub);
-
- boolean acquiredBySubscription();
- boolean isAcquiredBy(Subscription subscription);
-
- void setRedelivered();
-
- boolean isRedelivered();
-
- Subscription getDeliveredSubscription();
-
- void reject();
-
- boolean isRejectedBy(Subscription subscription);
-
- int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
-
boolean isQueueDeleted();
QueueEntry getNextNode();
QueueEntry getNextValidEntry();
- void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
- boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
-
-
- /**
- * Number of times this queue entry has been delivered.
- *
- * @return delivery count
- */
- int getDeliveryCount();
-
- void incrementDeliveryCount();
-
- void decrementDeliveryCount();
-
- Filterable asFilterable();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 18a5db597f..ea50337b61 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -456,6 +457,12 @@ public abstract class QueueEntryImpl implements QueueEntry
return _deliveryCount;
}
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return getQueue().getMaximumDeliveryCount();
+ }
+
public void incrementDeliveryCount()
{
_deliveryCountUpdater.incrementAndGet(this);
@@ -491,6 +498,12 @@ public abstract class QueueEntryImpl implements QueueEntry
return false;
}
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return getQueue();
+ }
+
private static class EntryInstanceProperties implements InstanceProperties
{
private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
index 08ecf58855..f683159e12 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -225,13 +226,13 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
}
@Override
- public boolean wouldSuspend(final QueueEntry msg)
+ public boolean wouldSuspend(final MessageInstance msg)
{
return !_target.allocateCredit(msg.getMessage());
}
@Override
- public void restoreCredit(final QueueEntry queueEntry)
+ public void restoreCredit(final MessageInstance queueEntry)
{
_target.restoreCredit(queueEntry.getMessage());
}
@@ -298,9 +299,9 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
}
@Override
- public boolean resend(final QueueEntry entry) throws AMQException
+ public boolean resend(final MessageInstance entry) throws AMQException
{
- return getQueue().resend(entry, this);
+ return getQueue().resend((QueueEntry)entry, this);
}
final SubFlushRunner getRunner()
@@ -353,7 +354,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
_noLocal = noLocal;
}
- public final boolean hasInterest(QueueEntry entry)
+ public final boolean hasInterest(MessageInstance entry)
{
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
@@ -429,7 +430,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
return _createTime;
}
- public final QueueEntry.SubscriptionAcquiredState getOwningState()
+ public final MessageInstance.SubscriptionAcquiredState getOwningState()
{
return _owningState;
}
@@ -464,7 +465,7 @@ class QueueSubscription<T extends SubscriptionTarget> implements Subscription
return _deliveredCount.longValue();
}
- public final void send(final QueueEntry entry, final boolean batch) throws AMQException
+ public final void send(final MessageInstance entry, final boolean batch) throws AMQException
{
_deliveredCount.incrementAndGet();
_deliveredBytes.addAndGet(entry.getMessage().getSize());
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e490ac38c1..97549a3c1e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -1065,7 +1065,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
- stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1076,7 +1076,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
- stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1371,7 +1371,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return this;
}
@@ -1401,10 +1401,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
@Override
+ public String getName()
+ {
+ return _queueId.toString();
+ }
+
+ @Override
public UUID getId()
{
return _queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
}
protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 66bcfff32b..74b91dec2d 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -70,7 +70,7 @@ public interface Transaction
public static interface Record
{
- TransactionLogResource getQueue();
+ TransactionLogResource getResource();
EnqueueableMessage getMessage();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
index 576dca847d..18b3125641 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
@@ -24,5 +24,7 @@ import java.util.UUID;
public interface TransactionLogResource
{
+ String getName();
public UUID getId();
+ boolean isDurable();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index e90502f023..572b076ba2 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -23,9 +23,8 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.util.StateChangeListener;
public interface Subscription
@@ -64,7 +63,7 @@ public interface Subscription
AMQSessionModel getSessionModel();
- QueueEntry.SubscriptionAcquiredState getOwningState();
+ MessageInstance.SubscriptionAcquiredState getOwningState();
void setNoLocal(boolean noLocal);
@@ -72,7 +71,7 @@ public interface Subscription
boolean isSuspended();
- boolean hasInterest(QueueEntry msg);
+ boolean hasInterest(MessageInstance msg);
boolean isClosed();
@@ -82,16 +81,16 @@ public interface Subscription
void close() throws AMQException;
- void send(QueueEntry entry, boolean batch) throws AMQException;
+ void send(MessageInstance entry, boolean batch) throws AMQException;
- boolean resend(QueueEntry entry) throws AMQException;
+ boolean resend(MessageInstance entry) throws AMQException;
void flushBatched();
void queueDeleted();
- boolean wouldSuspend(QueueEntry msg);
+ boolean wouldSuspend(MessageInstance msg);
boolean trySendLock();
@@ -100,7 +99,7 @@ public interface Subscription
void releaseSendLock();
- void restoreCredit(final QueueEntry queueEntry);
+ void restoreCredit(final MessageInstance queueEntry);
void setStateListener(final StateChangeListener<? extends Subscription, State> listener);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
index f1d77d9d42..80298cccc8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
@@ -21,9 +21,9 @@
package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.util.StateChangeListener;
public interface SubscriptionTarget
@@ -49,7 +49,7 @@ public interface SubscriptionTarget
AMQSessionModel getSessionModel();
- void send(QueueEntry entry, boolean batch) throws AMQException;
+ void send(MessageInstance entry, boolean batch) throws AMQException;
void flushBatched();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 1f5a4907ed..03717ed6ae 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -25,12 +25,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -88,7 +90,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -158,15 +160,15 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index b057998456..b9d91a647b 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -25,11 +25,13 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -73,7 +75,7 @@ public class AutoCommitTransaction implements ServerTransaction
immediateAction.postCommit();
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -105,15 +107,15 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
index d48b09d912..238facf4b5 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -22,10 +22,12 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -74,7 +76,7 @@ public class DistributedTransaction implements ServerTransaction
}
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
@@ -87,13 +89,13 @@ public class DistributedTransaction implements ServerTransaction
}
}
- public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
{
if(_branch != null)
{
- for(QueueEntry entry : messages)
+ for(MessageInstance entry : messages)
{
- _branch.dequeue(entry.getQueue(), entry.getMessage());
+ _branch.dequeue(entry.getOwningResource(), entry.getMessage());
}
_branch.addPostTransactionAction(postTransactionAction);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index 6b12862690..ada4eeb553 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -335,7 +336,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
@@ -344,7 +345,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
}
@@ -356,9 +357,9 @@ public class DtxBranch
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message)
+ public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
{
- _dequeueRecords.add(new Record(queue, message));
+ _dequeueRecords.add(new Record(resource, message));
}
@@ -369,18 +370,18 @@ public class DtxBranch
private static final class Record implements Transaction.Record
{
- private final BaseQueue _queue;
+ private final TransactionLogResource _resource;
private final EnqueueableMessage _message;
- public Record(BaseQueue queue, EnqueueableMessage message)
+ public Record(TransactionLogResource resource, EnqueueableMessage message)
{
- _queue = queue;
+ _resource = resource;
_message = message;
}
- public BaseQueue getQueue()
+ public TransactionLogResource getResource()
{
- return _queue;
+ return _resource;
}
public EnqueueableMessage getMessage()
@@ -390,7 +391,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _queue.isDurable();
+ return _message.isPersistent() && _resource.isDurable();
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 81aabc6bd3..93482153d3 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +93,7 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -118,7 +120,7 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -126,10 +128,10 @@ public class LocalTransaction implements ServerTransaction
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index 240ad154ba..3355a7ed06 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -24,8 +24,9 @@ import java.util.Collection;
import java.util.List;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
@@ -79,14 +80,14 @@ public interface ServerTransaction
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+ void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
/**
* Dequeue a message(s) from queue(s) registering a post transaction action.
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void dequeue(Collection<QueueEntry> messages, Action postTransactionAction);
+ void dequeue(Collection<MessageInstance> messages, Action postTransactionAction);
/**
* Enqueue a message to a queue registering a post transaction action.
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index b01f1d1ebc..f82490ee79 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -173,13 +173,13 @@ public class VirtualHostConfigRecoveryHandler implements
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getId().toString()));
+ record.getResource().getId().toString()));
}
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -223,7 +223,7 @@ public class VirtualHostConfigRecoveryHandler implements
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getId().toString()));
+ record.getResource().getId().toString()));
}
}
@@ -312,10 +312,22 @@ public class VirtualHostConfigRecoveryHandler implements
new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return "<<UNKNOWN>>";
+ }
+
+ @Override
public UUID getId()
{
return queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
txn.commitTranAsync();
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 1947a4e3b6..0d38b7002a 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -25,6 +25,7 @@ import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
@@ -68,7 +69,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
- ArrayList<QueueEntry> msgs = getSubscription().getMessages();
+ ArrayList<MessageInstance> msgs = getSubscription().getMessages();
try
{
assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
@@ -87,7 +88,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
// Show message order on failure.
int index = 1;
- for (QueueEntry qe : msgs)
+ for (MessageInstance qe : msgs)
{
System.err.println(index + ":" + qe.getMessage().getMessageNumber());
index++;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index dbc04bc0cd..e74bc992a2 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -45,6 +46,12 @@ public class MockQueueEntry implements QueueEntry
return false;
}
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
public boolean acquiredBySubscription()
{
return false;
@@ -225,4 +232,10 @@ public class MockQueueEntry implements QueueEntry
{
return InstanceProperties.EMPTY;
}
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return getQueue();
+ }
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index db39eb80a9..51ae822b2e 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -206,7 +207,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after an enqueue", _subscription.getQueueContext().getReleasedEntry());
+ assertNull("There should be no releasedEntry after an enqueue",
+ _subscription.getQueueContext().getReleasedEntry());
}
/**
@@ -222,7 +224,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
EnumSet.noneOf(Subscription.Option.class));
Thread.sleep(150);
assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after enqueues", _subscription.getQueueContext().getReleasedEntry());
+ assertNull("There should be no releasedEntry after enqueues",
+ _subscription.getQueueContext().getReleasedEntry());
}
/**
@@ -273,7 +276,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _subscription.getQueueContext().getReleasedEntry());
}
/**
@@ -321,7 +325,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _subscription.getQueueContext().getReleasedEntry());
}
@@ -375,7 +380,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ _subscription.getQueueContext().getReleasedEntry());
}
@@ -430,8 +436,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
assertEquals("Unexpected total number of messages sent to both subscriptions after release",
3,
target1.getMessages().size() + target2.getMessages().size());
- assertNull("releasedEntry should be cleared after requeue processed", subscription1.getQueueContext().getReleasedEntry());
- assertNull("releasedEntry should be cleared after requeue processed", subscription2.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ subscription1.getQueueContext().getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed",
+ subscription2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
@@ -726,9 +734,9 @@ public class SimpleAMQQueueTest extends QpidTestCase
});
// check expected messages delivered to correct consumers
- verifyReceivedMessages(Arrays.asList(msg1,msg2,msg3), sub1.getMessages());
- verifyReceivedMessages(Collections.singletonList(msg4), sub2.getMessages());
- verifyReceivedMessages(Collections.singletonList(msg5), sub3.getMessages());
+ verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages());
+ verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages());
+ verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
}
/**
@@ -919,7 +927,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
* @param entry
* @param batch
*/
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
super.send(entry, batch);
latch.countDown();
@@ -953,7 +961,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
{
Thread.currentThread().interrupt();
}
- List<QueueEntry> expected = Arrays.asList(entries.get(0), entries.get(2), entries.get(3));
+ List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
verifyReceivedMessages(expected, subscription.getMessages());
}
@@ -1027,7 +1035,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
putGivenNumberOfMessages(queue, 4);
// assert received messages
- List<QueueEntry> messages = subscription.getMessages();
+ List<MessageInstance> messages = subscription.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
@@ -1222,13 +1230,13 @@ public class SimpleAMQQueueTest extends QpidTestCase
return entriesList;
}
- private void verifyReceivedMessages(List<QueueEntry> expected,
- List<QueueEntry> delivered)
+ private void verifyReceivedMessages(List<MessageInstance> expected,
+ List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
expected.size(), delivered.size());
- for (QueueEntry msg : expected)
+ for (MessageInstance msg : expected)
{
assertTrue("Consumer did not receive msg: "
+ msg.getMessage().getMessageNumber(), delivered.contains(msg));
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 3dfe057285..d3ee938586 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -464,7 +464,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
@Override
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return _queue;
}
@@ -505,7 +505,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
return false;
}
- if (_queue == null && other.getQueue() != null)
+ if (_queue == null && other.getResource() != null)
{
return false;
}
@@ -513,7 +513,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
return false;
}
- return _queue.getId().equals(other.getQueue().getId());
+ return _queue.getId().equals(other.getResource().getId());
}
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 121c380736..7a4f92f0ca 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -154,6 +154,12 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
return _transactionResource;
}
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
+
private static class TestMessage implements EnqueueableMessage
{
private final StoredMessage<?> _handle;
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index cc5235a420..2ee72b9a36 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
@@ -54,7 +55,7 @@ public class MockSubscription implements SubscriptionTarget
private AMQQueue queue = null;
private StateChangeListener<SubscriptionTarget, State> _listener = null;
private State _state = State.ACTIVE;
- private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+ private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
private final Lock _stateChangeLock = new ReentrantLock();
private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -156,7 +157,7 @@ public class MockSubscription implements SubscriptionTarget
{
}
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
if (messages.contains(entry))
{
@@ -203,7 +204,7 @@ public class MockSubscription implements SubscriptionTarget
_listener = listener;
}
- public ArrayList<QueueEntry> getMessages()
+ public ArrayList<MessageInstance> getMessages()
{
return messages;
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 3c66a4c94b..11b9bbe1b4 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
@@ -47,7 +48,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
private MessageStore _transactionLog;
private AMQQueue _queue;
private List<AMQQueue> _queues;
- private Collection<QueueEntry> _queueEntries;
+ private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action;
private MockStoreTransaction _storeTransaction;
@@ -373,9 +374,9 @@ public class AutoCommitTransactionTest extends QpidTestCase
assertFalse("Rollback action must be fired", _action.isRollbackActionFired());
}
- private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+ private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
- Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index f3f5e00346..80e794e0ff 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
@@ -46,7 +47,7 @@ public class LocalTransactionTest extends QpidTestCase
private AMQQueue _queue;
private List<AMQQueue> _queues;
- private Collection<QueueEntry> _queueEntries;
+ private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action1;
private MockAction _action2;
@@ -597,9 +598,9 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
}
- private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+ private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
- Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 04510064de..b7788911c7 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
@@ -30,10 +31,10 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
+ private final MessageInstance _entry;
private final SubscriptionTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
_target = target;
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index 0cdced728a..0c238c4d55 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -29,10 +30,10 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
+ private final MessageInstance _entry;
private SubscriptionTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
{
_entry = entry;
_target = target;
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index 9b5272d413..71ad60c7b8 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -21,17 +21,18 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
private final SubscriptionTarget_0_10 _sub;
- private final QueueEntry _entry;
+ private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 098ddc2fae..73263bd931 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -399,9 +400,9 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
{
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
index aee7acc99b..304b32b250 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
@@ -27,11 +27,13 @@ import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -193,7 +195,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public void send(final QueueEntry entry, boolean batch) throws AMQException
+ public void send(final MessageInstance entry, boolean batch) throws AMQException
{
ServerMessage serverMsg = entry.getMessage();
@@ -275,7 +277,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
{
public void onComplete(Method method)
{
- deferredAddCredit(1, entry.getSize());
+ deferredAddCredit(1, entry.getMessage().getSize());
}
});
}
@@ -309,10 +311,10 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
- void recordUnacknowledged(QueueEntry entry)
+ void recordUnacknowledged(MessageInstance entry)
{
_unacknowledgedCount.incrementAndGet();
- _unacknowledgedBytes.addAndGet(entry.getSize());
+ _unacknowledgedBytes.addAndGet(entry.getMessage().getSize());
}
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
@@ -334,10 +336,10 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
}
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
{
AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
+ dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
{
public void postCommit()
@@ -356,7 +358,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
});
}
- void reject(final QueueEntry entry)
+ void reject(final MessageInstance entry)
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
@@ -366,7 +368,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
}
- void release(final QueueEntry entry, final boolean setRedelivered)
+ void release(final MessageInstance entry, final boolean setRedelivered)
{
if (setRedelivered)
{
@@ -388,7 +390,7 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
}
}
- protected void sendToDLQOrDiscard(QueueEntry entry)
+ protected void sendToDLQOrDiscard(MessageInstance entry)
{
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
@@ -405,26 +407,30 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
if (requeues == 0)
{
- final AMQQueue queue = entry.getQueue();
- final Exchange alternateExchange = queue.getAlternateExchange();
-
- if(alternateExchange != null)
- {
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
- }
- else
+ TransactionLogResource owningResource = entry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
{
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
- queue.getName(),
- msg.getRoutingKey()));
+ final AMQQueue queue = (AMQQueue)owningResource;
+ final Exchange alternateExchange = queue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
+ }
+ else
+ {
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
+ }
}
}
}
- private boolean isMaxDeliveryLimitReached(QueueEntry entry)
+ private boolean isMaxDeliveryLimitReached(MessageInstance entry)
{
- final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
+ final int maxDeliveryLimit = entry.getMaximumDeliveryCount();
return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
}
@@ -519,12 +525,12 @@ public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implemen
return _stopped.get();
}
- public void acknowledge(QueueEntry entry)
+ public void acknowledge(MessageInstance entry)
{
// TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(getSubscription()))
{
- _unacknowledgedBytes.addAndGet(-entry.getSize());
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
_unacknowledgedCount.decrementAndGet();
entry.delete();
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 516d6e48ff..ce1a132973 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -67,7 +68,9 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -143,7 +146,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private volatile boolean _rollingBack;
private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
- private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
private long _createTime = System.currentTimeMillis();
@@ -673,22 +676,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* delivery tag)
* @param subscription The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription)
{
if (_logger.isDebugEnabled())
{
- if (entry.getQueue() == null)
- {
- _logger.debug("Adding unacked message with a null queue:" + entry);
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + subscription);
- }
- }
+ + ") for " + subscription);
+
}
_unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -711,7 +705,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
if (!messagesToBeDelivered.isEmpty())
{
@@ -722,21 +716,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- for (QueueEntry unacked : messagesToBeDelivered)
+ for (MessageInstance unacked : messagesToBeDelivered)
{
- if (!unacked.isQueueDeleted())
- {
- // Mark message redelivered
- unacked.setRedelivered();
-
- // Ensure message is released for redelivery
- unacked.release();
+ // Mark message redelivered
+ unacked.setRedelivered();
- }
- else
- {
- unacked.delete();
- }
+ // Ensure message is released for redelivery
+ unacked.release();
}
}
@@ -750,7 +736,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
public void requeue(long deliveryTag) throws AMQException
{
- QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
@@ -758,20 +744,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
unacked.setRedelivered();
// Ensure message is released for redelivery
- if (!unacked.isQueueDeleted())
- {
-
- // Ensure message is released for redelivery
- unacked.release();
+ unacked.release();
- }
- else
- {
- _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
-
- unacked.delete();
- }
}
else
{
@@ -784,10 +758,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
return maximumDeliveryCount > 0;
}
@@ -796,10 +770,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean isDeliveredTooManyTimes(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
final int numDeliveries = queueEntry.getDeliveryCount();
return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
}
@@ -818,8 +792,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
if (_logger.isDebugEnabled())
{
@@ -831,9 +805,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
msgToRequeue,
- msgToResend,
- requeue,
- _messageStore));
+ msgToResend
+ ));
// Process Messages to Resend
@@ -849,9 +822,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
@@ -877,9 +850,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
// Process Messages to Requeue at the front of the queue
- for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
@@ -905,11 +878,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+ Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
}
- private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+ private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
{
return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -1077,7 +1050,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
postRollbackTask.run();
- for(QueueEntry entry : _resendList)
+ for(MessageInstance entry : _resendList)
{
Subscription sub = entry.getDeliveredSubscription();
if(sub == null || sub.isClosed())
@@ -1152,7 +1125,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1288,9 +1261,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private class MessageAcknowledgeAction implements ServerTransaction.Action
{
- private final Collection<QueueEntry> _ackedMessages;
+ private final Collection<MessageInstance> _ackedMessages;
- public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+ public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
{
_ackedMessages = ackedMessages;
}
@@ -1299,7 +1272,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- for(QueueEntry entry : _ackedMessages)
+ for(MessageInstance entry : _ackedMessages)
{
entry.delete();
}
@@ -1322,7 +1295,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- for(QueueEntry entry : _ackedMessages)
+ for(MessageInstance entry : _ackedMessages)
{
entry.release();
}
@@ -1490,7 +1463,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+ final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
@@ -1499,6 +1472,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
+ final Subscription sub = rejectedQueueEntry.getDeliveredSubscription();
int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
{
@@ -1512,23 +1486,28 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(requeues == 0)
{
- final AMQQueue queue = rejectedQueueEntry.getQueue();
- final Exchange altExchange = queue.getAlternateExchange();
-
- if (altExchange == null)
+ final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ final AMQQueue queue = (AMQQueue) owningResource;
- }
- else
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- _actor.message(_logSubject,
- ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ final Exchange altExchange = queue.getAlternateExchange();
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 060aebdd65..06c1d79439 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
@@ -35,26 +36,20 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
- private final Map<Long, QueueEntry> _msgToRequeue;
- private final Map<Long, QueueEntry> _msgToResend;
- private final boolean _requeueIfUnableToResend;
+ private final Map<Long, MessageInstance> _msgToRequeue;
+ private final Map<Long, MessageInstance> _msgToResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
- Map<Long, QueueEntry> msgToRequeue,
- Map<Long, QueueEntry> msgToResend,
- boolean requeueIfUnableToResend,
- MessageStore txnLog)
+ Map<Long, MessageInstance> msgToRequeue,
+ Map<Long, MessageInstance> msgToResend)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
_msgToResend = msgToResend;
- _requeueIfUnableToResend = requeueIfUnableToResend;
- _transactionLog = txnLog;
}
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
{
message.setRedelivered();
@@ -73,58 +68,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
}
else
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (!message.isQueueDeleted())
- {
- if (_requeueIfUnableToResend)
- {
- _msgToRequeue.put(deliveryTag, message);
- }
- else
- {
-
- dequeueEntry(message);
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
- }
- else
- {
- dequeueEntry(message);
- _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
- }
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
// false means continue processing
return false;
}
-
- private void dequeueEntry(final QueueEntry node)
- {
- ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
- dequeueEntry(node, txn);
- }
-
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
- {
- txn.dequeue(node.getQueue(), node.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- node.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
public void visitComplete()
{
_unacknowledgedMessageMap.clear();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
index 5ba0a2f893..89a5aa55c1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
public interface RecordDeliveryMethod
{
- void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag);
+ void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
index a9d131a8d2..9a6cec87bd 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,7 +103,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -165,7 +166,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -176,7 +177,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- _txn.dequeue(entry.getQueue(), entry.getMessage(), NOOP);
+ _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
ServerMessage message = entry.getMessage();
MessageReference ref = message.newReference();
@@ -281,7 +282,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
* @throws org.apache.qpid.AMQException
*/
@Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
@@ -492,7 +493,7 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
}
- protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
+ protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
{
_recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
}
@@ -520,9 +521,9 @@ public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget
_channel.getProtocolSession().flushBatched();
}
- protected void addUnacknowledgedMessage(QueueEntry entry)
+ protected void addUnacknowledgedMessage(MessageInstance entry)
{
- final long size = entry.getSize();
+ final long size = entry.getMessage().getSize();
_unacknowledgedBytes.addAndGet(size);
_unacknowledgedCount.incrementAndGet();
entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
index 1d41bcdcf4..fcbbadd507 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMap
*@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+ boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
void visitComplete();
}
void visit(Visitor visitor) throws AMQException;
- void add(long deliveryTag, QueueEntry message);
+ void add(long deliveryTag, MessageInstance message);
- QueueEntry remove(long deliveryTag);
+ MessageInstance remove(long deliveryTag);
- Collection<QueueEntry> cancelAllMessages();
+ Collection<MessageInstance> cancelAllMessages();
int size();
void clear();
- QueueEntry get(long deliveryTag);
+ MessageInstance get(long deliveryTag);
/**
* Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMap
*/
Set<Long> getDeliveryTags();
- Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+ Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 17b2c7b985..8d70e769d3 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
private long _unackedSize;
- private Map<Long, QueueEntry> _map;
+ private Map<Long, MessageInstance> _map;
private long _lastDeliveryTag;
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+ _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
{
if (multiple)
{
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
else
{
- final QueueEntry entry = get(deliveryTag);
+ final MessageInstance entry = get(deliveryTag);
if(entry != null)
{
msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
- public void remove(Map<Long,QueueEntry> msgs)
+ public void remove(Map<Long,MessageInstance> msgs)
{
synchronized (_lock)
{
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public QueueEntry remove(long deliveryTag)
+ public MessageInstance remove(long deliveryTag)
{
synchronized (_lock)
{
- QueueEntry message = _map.remove(deliveryTag);
+ MessageInstance message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+ Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, MessageInstance> entry : currentEntries)
{
visitor.callback(entry.getKey().longValue(), entry.getValue());
}
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void add(long deliveryTag, QueueEntry message)
+ public void add(long deliveryTag, MessageInstance message)
{
synchronized (_lock)
{
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public Collection<QueueEntry> cancelAllMessages()
+ public Collection<MessageInstance> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<QueueEntry> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+ Collection<MessageInstance> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public QueueEntry get(long key)
+ public MessageInstance get(long key)
{
synchronized (_lock)
{
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
return ackedMessageMap.values();
}
- private void collect(long key, Map<Long, QueueEntry> msgs)
+ private void collect(long key, Map<Long, MessageInstance> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
msgs.put(entry.getKey(),entry.getValue());
if (entry.getKey() == key)
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index e632fe02b4..53c0ae7381 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -149,7 +150,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index 90f80a27e7..73559038f3 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
if (message == null)
{
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
else
{
- if (message.isQueueDeleted())
- {
- _logger.warn("Message's Queue has already been purged, dropping message");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
- {
- message.delete();
- }
- return;
- }
if (message.getMessage() == null)
{
@@ -100,11 +91,11 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (body.getRequeue())
{
- channel.requeue(deliveryTag);
-
//this requeue represents a message rejected from the pre-dispatch queue
//therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
}
else
{
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index c3514d3778..d542c82a5b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
@@ -194,8 +195,8 @@ public class AckTest extends QpidTestCase
{
assertTrue(deliveryTag == i);
i++;
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
}
}
@@ -275,8 +276,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -314,8 +315,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -346,8 +347,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index a6e5f29964..9fa9e49c6b 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_8;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -63,7 +64,6 @@ public class ExtractResendAndRequeueTest extends TestCase
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
- private MessageStore _messageStore = new TestMemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
private Subscription _subscription;
private boolean _queueDeleted;
@@ -141,12 +141,12 @@ public class ExtractResendAndRequeueTest extends TestCase
//We don't need the subscription object here.
acquireMessages(_referenceList);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -170,94 +170,17 @@ public class ExtractResendAndRequeueTest extends TestCase
// Close subscription
when(_subscription.isClosed()).thenReturn(true);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
- * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = true so all messages should go to msgToRequeue
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
- * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testDrop() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
-
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
- * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
- * future we may wish to dead letter the message.
- *
- * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
- *
- * @throws AMQException the visit interface throws this
- */
- public void testDiscard() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- _queueDeleted = true;
- // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
- }
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 6cdd6f3119..390944ad3a 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -65,6 +65,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -85,14 +86,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private SubscriptionTarget_1_0 _target;
private boolean _draining;
- private final Map<Binary, QueueEntry> _unsettledMap =
- new HashMap<Binary, QueueEntry>();
+ private final Map<Binary, MessageInstance> _unsettledMap =
+ new HashMap<Binary, MessageInstance>();
private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
new ConcurrentHashMap<Binary, UnsettledAction>();
private volatile SendingLinkAttachment _linkAttachment;
private TerminusDurability _durability;
- private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
private final AMQQueue _queue;
@@ -559,7 +560,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
}
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
{
_unsettledActionMap.put(tag,unsettledAction);
if(getTransactionId() == null)
@@ -631,14 +632,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
endpoint.setDeliveryStateHandler(this);
Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
- for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final QueueEntry queueEntry = entry.getValue();
+ final MessageInstance queueEntry = entry.getValue();
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
@@ -706,9 +707,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public Map getUnsettledOutcomeMap()
{
- Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
- for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
{
entry.setValue(null);
}
@@ -720,4 +721,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_closeAction = action;
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return _vhost;
+ }
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
index 14d9934c1d..a3793392d5 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
@@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -112,7 +113,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
}
}
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
// TODO
send(entry);
@@ -123,7 +124,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
// TODO
}
- public void send(final QueueEntry queueEntry) throws AMQException
+ public void send(final MessageInstance queueEntry) throws AMQException
{
ServerMessage serverMessage = queueEntry.getMessage();
Message_1_0 message;
@@ -134,7 +135,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
else
{
final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
- message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost());
+ message = (Message_1_0) converter.convert(serverMessage, _link.getVirtualHost());
}
Transfer transfer = new Transfer();
@@ -344,10 +345,10 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
private class DispositionAction implements UnsettledAction
{
- private final QueueEntry _queueEntry;
+ private final MessageInstance _queueEntry;
private final Binary _deliveryTag;
- public DispositionAction(Binary tag, QueueEntry queueEntry)
+ public DispositionAction(Binary tag, MessageInstance queueEntry)
{
_deliveryTag = tag;
_queueEntry = queueEntry;
@@ -378,7 +379,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
if(outcome instanceof Accepted)
{
- txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+ txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
@@ -469,7 +470,7 @@ class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
private class DoNothingAction implements UnsettledAction
{
public DoNothingAction(final Binary tag,
- final QueueEntry queueEntry)
+ final MessageInstance queueEntry)
{
}