diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-11-17 18:38:20 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-11-17 18:38:20 +0000 |
| commit | 7d07cd053fe2dcf8923774fed40db54bec18cc7c (patch) | |
| tree | 3ff2d4dba19a14ba2da200a354bf46560c8bad62 /qpid/java/broker/src | |
| parent | 932ebedd275f667e013aa5c1a3cafeee15d2afae (diff) | |
| download | qpid-python-7d07cd053fe2dcf8923774fed40db54bec18cc7c.tar.gz | |
QPID-2703: 0-8..0-9-1 Transaction rollback/recover does not restore consumer credit.
This change restores consumer credit after rollback/recover by restoring credit on reciept of basic.reject from the consumer.
This change is basically as QPID-2506, but with additional changes to avoid the 0-10 path. Work by Robbie Gemmell and myself.
merged from trunk r1203137
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203316 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
7 files changed, 26 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 9623be595c..fda8cd0eb0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -167,18 +167,6 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } - public void stop() - { - if(_bytesCreditLimit > 0) - { - _bytesCreditLimit = 0; - } - if(_messageCreditLimit > 0) - { - _messageCreditLimit = 0; - } - - } public synchronized void addCredit(long count, long bytes) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5b57e40a82..3d011b99c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -233,8 +233,13 @@ public class QueueEntryImpl implements QueueEntry if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(); + Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription(); + if (subscription != null) + { + subscription.releaseQueueEntry(this); + } } - + if(!getQueue().isDeleted()) { getQueue().requeue(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a095ef47ea..ab47d89e01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -681,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription - sub.onDequeue(entry); + sub.restoreCredit(entry); } else { @@ -1659,7 +1659,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (sub.acquires() && !node.acquire(sub)) { - sub.onDequeue(node); + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(node); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 0a3576ff42..3a950c2f4f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -84,6 +84,8 @@ public interface Subscription void releaseSendLock(); + void releaseQueueEntry(final QueueEntry queueEntryImpl); + void onDequeue(final QueueEntry queueEntry); void restoreCredit(final QueueEntry queueEntry); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 6603f58104..8b11a5817a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -623,13 +623,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage restoreCredit(queueEntry); } + public void releaseQueueEntry(final QueueEntry queueEntry) + { + restoreCredit(queueEntry); + } + public void restoreCredit(final QueueEntry queueEntry) { _creditManager.restoreCredit(1, queueEntry.getSize()); } - - public void creditStateChanged(boolean hasCredit) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 9d52901fef..0a90d07771 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -675,7 +675,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void onDequeue(QueueEntry queueEntry) { + // no-op for 0-10, credit restored by completing command. + } + public void releaseQueueEntry(QueueEntry queueEntry) + { + // no-op for 0-10, credit restored by completing command. } public void setStateListener(StateListener listener) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 6fbc627d8c..1efe1028db 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -212,6 +212,10 @@ public class MockSubscription implements Subscription { } + public void releaseQueueEntry(QueueEntry queueEntry) + { + } + public void send(QueueEntry entry) throws AMQException { if (messages.contains(entry)) |
