summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java1
3 files changed, 6 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 9cfa7dbcf3..aa64d6947d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -71,6 +71,8 @@ public interface AMQQueue<X extends AMQQueue<X>>
void decrementUnackedMsgCount(QueueEntry queueEntry);
+ void incrementUnackedMsgCount(QueueEntry entry);
+
boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer);
List<? extends QueueEntry> getMessagesOnTheQueue();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 8328baca3f..f905558f13 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1230,7 +1230,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
setLastSeenEntry(sub, entry);
_deliveredMessages.incrementAndGet();
- incrementUnackedMsgCount(entry);
sub.send(entry, batch);
}
@@ -2461,13 +2460,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _unackedMsgBytes.get();
}
+ @Override
public void decrementUnackedMsgCount(QueueEntry queueEntry)
{
_unackedMsgCount.decrementAndGet();
_unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
- private void incrementUnackedMsgCount(QueueEntry entry)
+ @Override
+ public void incrementUnackedMsgCount(QueueEntry entry)
{
_unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index a0f2dc798d..452c5ff14f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -214,6 +214,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
+ getQueue().incrementUnackedMsgCount(this);
}
return acquired;
}