summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-12 14:48:34 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-12 14:48:34 +0000
commitc1b1f97c589be6b5bde0b57da5e3e932728d91e2 (patch)
tree9fdaa198d64d6ced3b3575b77bd7edb51556fef5 /qpid/java/broker-core/src
parent372b2b83eee4501412199326de903d3be65fd6f8 (diff)
downloadqpid-python-c1b1f97c589be6b5bde0b57da5e3e932728d91e2.tar.gz
QPID-6268 : [Java Broker] increment/decrement the unacknowledge count on the queue precisely when the queue entry changes state
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1644906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java1
3 files changed, 6 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 9cfa7dbcf3..aa64d6947d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -71,6 +71,8 @@ public interface AMQQueue<X extends AMQQueue<X>>
void decrementUnackedMsgCount(QueueEntry queueEntry);
+ void incrementUnackedMsgCount(QueueEntry entry);
+
boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer);
List<? extends QueueEntry> getMessagesOnTheQueue();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8328baca3f..f905558f13 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1230,7 +1230,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
setLastSeenEntry(sub, entry);
_deliveredMessages.incrementAndGet();
- incrementUnackedMsgCount(entry);
sub.send(entry, batch);
}
@@ -2461,13 +2460,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _unackedMsgBytes.get();
}
+ @Override
public void decrementUnackedMsgCount(QueueEntry queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(QueueEntry entry)
+ @Override
+ public void incrementUnackedMsgCount(QueueEntry entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index a0f2dc798d..452c5ff14f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -214,6 +214,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
+ getQueue().incrementUnackedMsgCount(this);
}
return acquired;
}