diff options
Diffstat (limited to 'qpid/java/broker/src/main')
3 files changed, 17 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3cd343e1b2..a668016f93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -93,8 +93,7 @@ public class AMQChannel private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - + protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); private final MessageStore _messageStore; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 408defe453..9419572399 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -82,6 +82,8 @@ public interface Subscription void setStateListener(final StateListener listener); + public State getState(); + QueueEntry getLastSeenEntry(); boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 556b87590c..df84270275 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return true; @@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param msg The message to send * @throws AMQException */ + @Override public void send(QueueEntry msg) throws AMQException { // We don't decrement the reference here as we don't want to consume the message @@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + + @Override + public boolean wouldSuspend(QueueEntry msg) + { + return false; + } + } public static class NoAckSubscription extends SubscriptionImpl @@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return false; @@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { @@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + @Override public boolean wouldSuspend(QueueEntry msg) { return false; @@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } + @Override public boolean isBrowser() { return false; @@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { |
