diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-12-12 14:48:34 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-12-12 14:48:34 +0000 |
| commit | c1b1f97c589be6b5bde0b57da5e3e932728d91e2 (patch) | |
| tree | 9fdaa198d64d6ced3b3575b77bd7edb51556fef5 /qpid/java/broker-core/src | |
| parent | 372b2b83eee4501412199326de903d3be65fd6f8 (diff) | |
| download | qpid-python-c1b1f97c589be6b5bde0b57da5e3e932728d91e2.tar.gz | |
QPID-6268 : [Java Broker] increment/decrement the unacknowledge count on the queue precisely when the queue entry changes state
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1644906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
3 files changed, 6 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9cfa7dbcf3..aa64d6947d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -71,6 +71,8 @@ public interface AMQQueue<X extends AMQQueue<X>> void decrementUnackedMsgCount(QueueEntry queueEntry); + void incrementUnackedMsgCount(QueueEntry entry); + boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer); List<? extends QueueEntry> getMessagesOnTheQueue(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 8328baca3f..f905558f13 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1230,7 +1230,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> setLastSeenEntry(sub, entry); _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(entry); sub.send(entry, batch); } @@ -2461,13 +2460,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return _unackedMsgBytes.get(); } + @Override public void decrementUnackedMsgCount(QueueEntry queueEntry) { _unackedMsgCount.decrementAndGet(); _unackedMsgBytes.addAndGet(-queueEntry.getSize()); } - private void incrementUnackedMsgCount(QueueEntry entry) + @Override + public void incrementUnackedMsgCount(QueueEntry entry) { _unackedMsgCount.incrementAndGet(); _unackedMsgBytes.addAndGet(entry.getSize()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index a0f2dc798d..452c5ff14f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -214,6 +214,7 @@ public abstract class QueueEntryImpl implements QueueEntry if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); + getQueue().incrementUnackedMsgCount(this); } return acquired; } |
