summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-08-12 09:36:08 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-08-12 09:36:08 +0000
commit8404b22733da9eed0769c4ed4967990ea6611e7d (patch)
tree4118052fabada15d8b674ed27845156f4b3e5461 /qpid/java/broker/src
parent80d8828cea5f482522cdd62f54f602a4b00d3ed6 (diff)
downloadqpid-python-8404b22733da9eed0769c4ed4967990ea6611e7d.tar.gz
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685104 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java1
5 files changed, 7 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index caf34f13bd..db3a05eb52 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -116,7 +116,6 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- msg.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index ef48b60bcd..c567387662 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
- message.restoreCredit();
+
}
return message;
@@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
_unackedSize -= unacked.getValue().getMessage().getSize();
- unacked.getValue().restoreCredit();
-
if (unacked.getKey() == deliveryTag)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index dd967a7cb1..2657c459a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable<QueueEntry>
void dispose(final StoreContext storeContext) throws MessageCleanupException;
- void restoreCredit();
-
void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
boolean isQueueDeleted();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index d26d6af7b2..dbad5438dc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+ s.restoreCredit(this);
+ }
+
getQueue().dequeue(storeContext, this);
if(_stateChangeListeners != null)
{
@@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void restoreCredit()
- {
- EntryState state = _state;
- if(state instanceof SubscriptionAcquiredState)
- {
- Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
- s.restoreCredit(this);
- }
- }
-
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
{
//if the queue is null then the message is waiting to be acked, but has been removed.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 03d59d3ab9..28af36e3db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext
{
beginTranIfNecessary();
}
- message.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);