diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
| commit | 8404b22733da9eed0769c4ed4967990ea6611e7d (patch) | |
| tree | 4118052fabada15d8b674ed27845156f4b3e5461 /qpid/java/broker/src | |
| parent | 80d8828cea5f482522cdd62f54f602a4b00d3ed6 (diff) | |
| download | qpid-python-8404b22733da9eed0769c4ed4967990ea6611e7d.tar.gz | |
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685104 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
5 files changed, 7 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index caf34f13bd..db3a05eb52 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -116,7 +116,6 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - msg.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index ef48b60bcd..c567387662 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(message != null) { _unackedSize -= message.getMessage().getSize(); - message.restoreCredit(); + } return message; @@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap _unackedSize -= unacked.getValue().getMessage().getSize(); - unacked.getValue().restoreCredit(); - if (unacked.getKey() == deliveryTag) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index dd967a7cb1..2657c459a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable<QueueEntry> void dispose(final StoreContext storeContext) throws MessageCleanupException; - void restoreCredit(); - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d26d6af7b2..dbad5438dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { + if (state instanceof SubscriptionAcquiredState) + { + Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); + s.restoreCredit(this); + } + getQueue().dequeue(storeContext, this); if(_stateChangeListeners != null) { @@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry } } - public void restoreCredit() - { - EntryState state = _state; - if(state instanceof SubscriptionAcquiredState) - { - Subscription s = ((SubscriptionAcquiredState) _state).getSubscription(); - s.restoreCredit(this); - } - } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException { //if the queue is null then the message is waiting to be acked, but has been removed. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 03d59d3ab9..28af36e3db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext { beginTranIfNecessary(); } - message.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); |
