summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-17 18:38:20 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-17 18:38:20 +0000
commit7d07cd053fe2dcf8923774fed40db54bec18cc7c (patch)
tree3ff2d4dba19a14ba2da200a354bf46560c8bad62 /qpid/java/broker/src
parent932ebedd275f667e013aa5c1a3cafeee15d2afae (diff)
downloadqpid-python-7d07cd053fe2dcf8923774fed40db54bec18cc7c.tar.gz
QPID-2703: 0-8..0-9-1 Transaction rollback/recover does not restore consumer credit.
This change restores consumer credit after rollback/recover by restoring credit on reciept of basic.reject from the consumer. This change is basically as QPID-2506, but with additional changes to avoid the 0-10 path. Work by Robbie Gemmell and myself. merged from trunk r1203137 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203316 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java4
7 files changed, 26 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 9623be595c..fda8cd0eb0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -167,18 +167,6 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
- public void stop()
- {
- if(_bytesCreditLimit > 0)
- {
- _bytesCreditLimit = 0;
- }
- if(_messageCreditLimit > 0)
- {
- _messageCreditLimit = 0;
- }
-
- }
public synchronized void addCredit(long count, long bytes)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 5b57e40a82..3d011b99c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -233,8 +233,13 @@ public class QueueEntryImpl implements QueueEntry
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount();
+ Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
+ if (subscription != null)
+ {
+ subscription.releaseQueueEntry(this);
+ }
}
-
+
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a095ef47ea..ab47d89e01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -681,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
- sub.onDequeue(entry);
+ sub.restoreCredit(entry);
}
else
{
@@ -1659,7 +1659,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (sub.acquires() && !node.acquire(sub))
{
- sub.onDequeue(node);
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(node);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 0a3576ff42..3a950c2f4f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -84,6 +84,8 @@ public interface Subscription
void releaseSendLock();
+ void releaseQueueEntry(final QueueEntry queueEntryImpl);
+
void onDequeue(final QueueEntry queueEntry);
void restoreCredit(final QueueEntry queueEntry);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 6603f58104..8b11a5817a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -623,13 +623,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
restoreCredit(queueEntry);
}
+ public void releaseQueueEntry(final QueueEntry queueEntry)
+ {
+ restoreCredit(queueEntry);
+ }
+
public void restoreCredit(final QueueEntry queueEntry)
{
_creditManager.restoreCredit(1, queueEntry.getSize());
}
-
-
public void creditStateChanged(boolean hasCredit)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 9d52901fef..0a90d07771 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -675,7 +675,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void onDequeue(QueueEntry queueEntry)
{
+ // no-op for 0-10, credit restored by completing command.
+ }
+ public void releaseQueueEntry(QueueEntry queueEntry)
+ {
+ // no-op for 0-10, credit restored by completing command.
}
public void setStateListener(StateListener listener)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 6fbc627d8c..1efe1028db 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -212,6 +212,10 @@ public class MockSubscription implements Subscription
{
}
+ public void releaseQueueEntry(QueueEntry queueEntry)
+ {
+ }
+
public void send(QueueEntry entry) throws AMQException
{
if (messages.contains(entry))