diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 13:33:42 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-04-03 13:33:42 +0000 |
| commit | 468568e85b76a040aeedb984be79913347700016 (patch) | |
| tree | 872d07f8b39a6853713ff8e5e61dcc5909a4c02c /java/broker | |
| parent | d62a09ce7b5e3c7a66ec85c4064d40e012552b0c (diff) | |
| download | qpid-python-468568e85b76a040aeedb984be79913347700016.tar.gz | |
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and purger threads will stop when the inMemory values are within the correct range.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761671 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index a7c7d87a6e..10d2f0ee2b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -359,7 +359,12 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList _asynchronousInhaler.compareAndSet(messageInhaler, null); int inhaled = 1; + //Because we may not be able to totally fill up to _memoryUsageMaximum we need to be able to say we've done + // enough loading and this inhale process should stop + boolean finshedInhaling = false; + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + && !finshedInhaling // Have we loaded all we can fit into memory && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do && (inhaled > 0) // ensure we could inhale something @@ -379,7 +384,9 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based // on the return from advance() which returns true if it can advance. boolean atEndofList = false; - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + + while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled our max memory + && !finshedInhaling // Have we loaded all we can fit into memory && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do && !atEndofList) // We have reached end of list QueueEntries { @@ -394,7 +401,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList { _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity()); } - inhaled = BATCH_PROCESS_COUNT; + finshedInhaling = true; } else { @@ -421,7 +428,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. - if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + if (!finshedInhaling _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) { if (_log.isInfoEnabled()) { @@ -471,7 +478,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList _asynchronousPurger.compareAndSet(messagePurger, null); int purged = 0; - while ((_atomicQueueInMemory.get() > _memoryUsageMinimum) + while ((_atomicQueueInMemory.get() > _memoryUsageMaximum) && purged < BATCH_PROCESS_COUNT && _asynchronousPurger.compareAndSet(null, messagePurger)) { @@ -496,6 +503,12 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList if (entry.isAvailable() && !entry.isFlowed()) { memoryUsage += entry.getSize(); + // If this message is what puts us over the limit then break + // out of this loop as we need to purge this item. + if (memoryUsage > _memoryUsageMaximum) + { + break; + } } atTail = !iterator.advance(); @@ -525,7 +538,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } //If we are still flowed and are over the minimum value then schedule to run again. - if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum) + if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum) { if (_log.isInfoEnabled()) { |
