diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-10-09 17:43:41 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-10-09 17:43:41 +0000 |
| commit | 4ff1d4ca6707275d1306d7f94c79b5d066a548ce (patch) | |
| tree | 64c619f663bf2f098ce442d1d5cfcbfbb56df527 /java/broker/src/main | |
| parent | 394823bba7976c170ac58e53b5d80ad12e0f1690 (diff) | |
| download | qpid-python-4ff1d4ca6707275d1306d7f94c79b5d066a548ce.tar.gz | |
QPID-1331 : Modified the BrowserSubscription to be consistent with the NoAck Subscription.
Added Test QueueBrowserUsesNoAckTest to validate the change.
Note that the Credit Manager Suspends the subscriber not the channel when credit is exhausted. JIRA to follow.
So to check if the subscription was suspended I needed to make a MockChannel and give it access to the subscriber map in the
Channel.
The test also needed to be able to interrogate the state of the Subscription which was not part of the Subscription interface, but was used by all subscriptions. So promoted to the interface and implemented the stubs in the various helper/test classes.
Added the ability to browse() via the InternalBrokerBaseCase and prevented a NPE when there were no messages returned via getDelivers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
3 files changed, 17 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3cd343e1b2..a668016f93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -93,8 +93,7 @@ public class AMQChannel private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); - + protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); private final MessageStore _messageStore; diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 408defe453..9419572399 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -82,6 +82,8 @@ public interface Subscription void setStateListener(final StateListener listener); + public State getState(); + QueueEntry getLastSeenEntry(); boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 556b87590c..df84270275 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return true; @@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param msg The message to send * @throws AMQException */ + @Override public void send(QueueEntry msg) throws AMQException { // We don't decrement the reference here as we don't want to consume the message @@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + + @Override + public boolean wouldSuspend(QueueEntry msg) + { + return false; + } + } public static class NoAckSubscription extends SubscriptionImpl @@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } + @Override public boolean isBrowser() { return false; @@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { @@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } + @Override public boolean wouldSuspend(QueueEntry msg) { return false; @@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } + @Override public boolean isBrowser() { return false; @@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage * @param entry The message to send * @throws AMQException */ + @Override public void send(QueueEntry entry) throws AMQException { |
