diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-12 12:00:06 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-12 12:00:06 +0000 |
| commit | 825aceb7e885c793309557a3a886f10c475c4c1c (patch) | |
| tree | 11a5b5926a714c956f6cd77f56b373e96755df13 /qpid/java/broker-core | |
| parent | 5701b4ba67b8e475326acfc9f28735aead8d9dfc (diff) | |
| download | qpid-python-825aceb7e885c793309557a3a886f10c475c4c1c.tar.gz | |
broswer consumer close is now pulled by IO rather than pushed by queue, fixing browser tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659232 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
4 files changed, 9 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index e32613d700..be4ac9d427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -54,14 +54,18 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget } @Override - public void processPendingMessages() + public void processPending() { while(hasMessagesToSend()) { sendNextMessage(); } + + processClosed(); } + protected abstract void processClosed(); + @Override public final boolean isSuspended() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 32b12d2a44..cef566793f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -33,7 +33,7 @@ public interface ConsumerTarget void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener); - void processPendingMessages(); + void processPending(); enum State { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index a545ce6e10..d3ce911406 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (consumerDone) { sub.flushBatched(); - if (lastLoop && !sub.isSuspended()) + boolean noMore = getNextAvailableEntry(sub) == null; + if (lastLoop && noMore) { sub.queueEmpty(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 53f1632933..caba0bd1d8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -249,7 +249,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void processPendingMessages() + public void processPending() { } |
