From 6a382bbcd88cd49fb1437b756a2816212ddebefe Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 12 Aug 2008 09:36:08 +0000 Subject: QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685104 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/ack/TxAck.java | 1 - .../qpid/server/ack/UnacknowledgedMessageMapImpl.java | 4 +--- .../java/org/apache/qpid/server/queue/QueueEntry.java | 2 -- .../org/apache/qpid/server/queue/QueueEntryImpl.java | 16 ++++++---------- .../apache/qpid/server/txn/NonTransactionalContext.java | 1 - 5 files changed, 7 insertions(+), 17 deletions(-) (limited to 'java/broker') diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index caf34f13bd..db3a05eb52 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -116,7 +116,6 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - msg.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index ef48b60bcd..c567387662 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(message != null) { _unackedSize -= message.getMessage().getSize(); - message.restoreCredit(); + } return message; @@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap _unackedSize -= unacked.getValue().getMessage().getSize(); - unacked.getValue().restoreCredit(); - if (unacked.getKey() == deliveryTag) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index dd967a7cb1..2657c459a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable void dispose(final StoreContext storeContext) throws MessageCleanupException; - void restoreCredit(); - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d26d6af7b2..dbad5438dc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { + if (state instanceof SubscriptionAcquiredState) + { + Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); + s.restoreCredit(this); + } + getQueue().dequeue(storeContext, this); if(_stateChangeListeners != null) { @@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry } } - public void restoreCredit() - { - EntryState state = _state; - if(state instanceof SubscriptionAcquiredState) - { - Subscription s = ((SubscriptionAcquiredState) _state).getSubscription(); - s.restoreCredit(this); - } - } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException { //if the queue is null then the message is waiting to be acked, but has been removed. diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 03d59d3ab9..28af36e3db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext { beginTranIfNecessary(); } - message.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); -- cgit v1.2.1