summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-25 14:40:30 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-25 14:40:30 +0000
commit66e32c70039212e13bccb9d31a32f049ebe74e13 (patch)
tree58c46008fd25450bededac44396c6ab62b88cf39 /java
parentd19a93f48676e9124bcc1fb943abbe628c60acf8 (diff)
downloadqpid-python-66e32c70039212e13bccb9d31a32f049ebe74e13.tar.gz
QPID-3780 : [Java Broker] reduce scavenge overhead on large queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1235771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java44
1 files changed, 26 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index b40e5a28c2..af0a654372 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.message.ServerMessage;
@@ -46,6 +47,7 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +57,17 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
_tail = _head;
}
- void advanceHead()
- {
- SimpleQueueEntryImpl next = _head.getNextNode();
- SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
- if (next == newNext)
- {
- if (_scavenges.incrementAndGet() > _scavengeCount)
- {
- _scavenges.set(0L);
- scavenge();
- }
- }
- }
-
void scavenge()
{
+ SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
SimpleQueueEntryImpl next = _head.getNextValidEntry();
- while (next != null)
+ if(hwm != null)
{
- next = next.getNextValidEntry();
+ while (next != null && hwm.compareTo(next)>0)
+ {
+ next = next.getNextValidEntry();
+ }
}
}
@@ -182,7 +173,24 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
public void entryDeleted(SimpleQueueEntryImpl queueEntry)
{
- advanceHead();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+ // the head of the queue has not been deleted, hence the deletion must have been mid queue.
+ if (next == newNext)
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+ unscavengedHWM = _unscavengedHWM.get();
+ }
+ if (_scavenges.incrementAndGet() > _scavengeCount)
+ {
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
}
public int getPriorities()