diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
| commit | 6a382bbcd88cd49fb1437b756a2816212ddebefe (patch) | |
| tree | 6e751937d20ef72ef8c9757cc21cff9b8f921ae7 /java/broker/src/main | |
| parent | 1f40aff0bb0351b05cdc0f2463da846f10e9a859 (diff) | |
| download | qpid-python-6a382bbcd88cd49fb1437b756a2816212ddebefe.tar.gz | |
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685104 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
5 files changed, 7 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index caf34f13bd..db3a05eb52 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -116,7 +116,6 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - msg.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index ef48b60bcd..c567387662 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(message != null) { _unackedSize -= message.getMessage().getSize(); - message.restoreCredit(); + } return message; @@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap _unackedSize -= unacked.getValue().getMessage().getSize(); - unacked.getValue().restoreCredit(); - if (unacked.getKey() == deliveryTag) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index dd967a7cb1..2657c459a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable<QueueEntry> void dispose(final StoreContext storeContext) throws MessageCleanupException; - void restoreCredit(); - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d26d6af7b2..dbad5438dc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { + if (state instanceof SubscriptionAcquiredState) + { + Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); + s.restoreCredit(this); + } + getQueue().dequeue(storeContext, this); if(_stateChangeListeners != null) { @@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry } } - public void restoreCredit() - { - EntryState state = _state; - if(state instanceof SubscriptionAcquiredState) - { - Subscription s = ((SubscriptionAcquiredState) _state).getSubscription(); - s.restoreCredit(this); - } - } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException { //if the queue is null then the message is waiting to be acked, but has been removed. diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 03d59d3ab9..28af36e3db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext { beginTranIfNecessary(); } - message.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); |
