summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-03 13:33:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-03 13:33:42 +0000
commit468568e85b76a040aeedb984be79913347700016 (patch)
tree872d07f8b39a6853713ff8e5e61dcc5909a4c02c /java/broker
parentd62a09ce7b5e3c7a66ec85c4064d40e012552b0c (diff)
downloadqpid-python-468568e85b76a040aeedb984be79913347700016.tar.gz
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and purger threads will stop when the inMemory values are within the correct range.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761671 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java23
1 files changed, 18 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index a7c7d87a6e..10d2f0ee2b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -359,7 +359,12 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
_asynchronousInhaler.compareAndSet(messageInhaler, null);
int inhaled = 1;
+ //Because we may not be able to totally fill up to _memoryUsageMaximum we need to be able to say we've done
+ // enough loading and this inhale process should stop
+ boolean finshedInhaling = false;
+
while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+ && !finshedInhaling // Have we loaded all we can fit into memory
&& (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available
&& (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
&& (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
// we won't have checked the last entry to see if we can load it. So create atEndofList and update it based
// on the return from advance() which returns true if it can advance.
boolean atEndofList = false;
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+
+ while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled our max memory
+ && !finshedInhaling // Have we loaded all we can fit into memory
&& (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
&& !atEndofList) // We have reached end of list QueueEntries
{
@@ -394,7 +401,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
_log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
}
- inhaled = BATCH_PROCESS_COUNT;
+ finshedInhaling = true;
}
else
{
@@ -421,7 +428,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
}
//If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ if (!finshedInhaling _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
{
if (_log.isInfoEnabled())
{
@@ -471,7 +478,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
_asynchronousPurger.compareAndSet(messagePurger, null);
int purged = 0;
- while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+ while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
&& purged < BATCH_PROCESS_COUNT
&& _asynchronousPurger.compareAndSet(null, messagePurger))
{
@@ -496,6 +503,12 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
if (entry.isAvailable() && !entry.isFlowed())
{
memoryUsage += entry.getSize();
+ // If this message is what puts us over the limit then break
+ // out of this loop as we need to purge this item.
+ if (memoryUsage > _memoryUsageMaximum)
+ {
+ break;
+ }
}
atTail = !iterator.advance();
@@ -525,7 +538,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
}
//If we are still flowed and are over the minimum value then schedule to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+ if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
{
if (_log.isInfoEnabled())
{