summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
3 files changed, 17 insertions, 2 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3cd343e1b2..a668016f93 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -93,8 +93,7 @@ public class AMQChannel
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
-
+ protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
private final MessageStore _messageStore;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 408defe453..9419572399 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -82,6 +82,8 @@ public interface Subscription
void setStateListener(final StateListener listener);
+ public State getState();
+
QueueEntry getLastSeenEntry();
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 556b87590c..df84270275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return true;
@@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param msg The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry msg) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
@@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
}
public static class NoAckSubscription extends SubscriptionImpl
@@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
@@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+ @Override
public boolean wouldSuspend(QueueEntry msg)
{
return false;
@@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{