From c1b1f97c589be6b5bde0b57da5e3e932728d91e2 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 12 Dec 2014 14:48:34 +0000 Subject: QPID-6268 : [Java Broker] increment/decrement the unacknowledge count on the queue precisely when the queue entry changes state git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1644906 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/queue/AMQQueue.java | 2 ++ .../src/main/java/org/apache/qpid/server/queue/AbstractQueue.java | 5 +++-- .../src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9cfa7dbcf3..aa64d6947d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -71,6 +71,8 @@ public interface AMQQueue> void decrementUnackedMsgCount(QueueEntry queueEntry); + void incrementUnackedMsgCount(QueueEntry entry); + boolean resend(final QueueEntry entry, final QueueConsumer consumer); List getMessagesOnTheQueue(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 8328baca3f..f905558f13 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1230,7 +1230,6 @@ public abstract class AbstractQueue> setLastSeenEntry(sub, entry); _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(entry); sub.send(entry, batch); } @@ -2461,13 +2460,15 @@ public abstract class AbstractQueue> return _unackedMsgBytes.get(); } + @Override public void decrementUnackedMsgCount(QueueEntry queueEntry) { _unackedMsgCount.decrementAndGet(); _unackedMsgBytes.addAndGet(-queueEntry.getSize()); } - private void incrementUnackedMsgCount(QueueEntry entry) + @Override + public void incrementUnackedMsgCount(QueueEntry entry) { _unackedMsgCount.incrementAndGet(); _unackedMsgBytes.addAndGet(entry.getSize()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index a0f2dc798d..452c5ff14f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -214,6 +214,7 @@ public abstract class QueueEntryImpl implements QueueEntry if(acquired) { _deliveryCountUpdater.compareAndSet(this,-1,0); + getQueue().incrementUnackedMsgCount(this); } return acquired; } -- cgit v1.2.1