summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-10-09 17:43:41 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-10-09 17:43:41 +0000
commit4ff1d4ca6707275d1306d7f94c79b5d066a548ce (patch)
tree64c619f663bf2f098ce442d1d5cfcbfbb56df527 /java/broker/src/main
parent394823bba7976c170ac58e53b5d80ad12e0f1690 (diff)
downloadqpid-python-4ff1d4ca6707275d1306d7f94c79b5d066a548ce.tar.gz
QPID-1331 : Modified the BrowserSubscription to be consistent with the NoAck Subscription.
Added Test QueueBrowserUsesNoAckTest to validate the change. Note that the Credit Manager Suspends the subscriber not the channel when credit is exhausted. JIRA to follow. So to check if the subscription was suspended I needed to make a MockChannel and give it access to the subscriber map in the Channel. The test also needed to be able to interrogate the state of the Subscription which was not part of the Subscription interface, but was used by all subscriptions. So promoted to the interface and implemented the stubs in the various helper/test classes. Added the ability to browse() via the InternalBrokerBaseCase and prevented a NPE when there were no messages returned via getDelivers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java14
3 files changed, 17 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3cd343e1b2..a668016f93 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -93,8 +93,7 @@ public class AMQChannel
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
-
+ protected final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
private final MessageStore _messageStore;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 408defe453..9419572399 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -82,6 +82,8 @@ public interface Subscription
void setStateListener(final StateListener listener);
+ public State getState();
+
QueueEntry getLastSeenEntry();
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 556b87590c..df84270275 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -79,6 +79,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return true;
@@ -91,6 +92,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param msg The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry msg) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
@@ -103,6 +105,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
}
public static class NoAckSubscription extends SubscriptionImpl
@@ -118,6 +127,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -130,6 +140,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{
@@ -166,6 +177,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
+ @Override
public boolean wouldSuspend(QueueEntry msg)
{
return false;
@@ -185,6 +197,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ @Override
public boolean isBrowser()
{
return false;
@@ -198,6 +211,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
* @param entry The message to send
* @throws AMQException
*/
+ @Override
public void send(QueueEntry entry) throws AMQException
{