summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-12 12:00:06 +0000
committerKeith Wall <kwall@apache.org>2015-02-12 12:00:06 +0000
commit825aceb7e885c793309557a3a886f10c475c4c1c (patch)
tree11a5b5926a714c956f6cd77f56b373e96755df13 /qpid/java/broker-core
parent5701b4ba67b8e475326acfc9f28735aead8d9dfc (diff)
downloadqpid-python-825aceb7e885c793309557a3a886f10c475c4c1c.tar.gz
broswer consumer close is now pulled by IO rather than pushed by queue, fixing browser tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
4 files changed, 9 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index e32613d700..be4ac9d427 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -54,14 +54,18 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
while(hasMessagesToSend())
{
sendNextMessage();
}
+
+ processClosed();
}
+ protected abstract void processClosed();
+
@Override
public final boolean isSuspended()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 32b12d2a44..cef566793f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -33,7 +33,7 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
- void processPendingMessages();
+ void processPending();
enum State
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index a545ce6e10..d3ce911406 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && !sub.isSuspended())
+ boolean noMore = getNextAvailableEntry(sub) == null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 53f1632933..caba0bd1d8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -249,7 +249,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}