diff options
7 files changed, 183 insertions, 67 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 599f0a8ca4..d25c096337 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -53,7 +53,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); protected boolean _disabled; private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); - private static final int BATCH_INHALE_COUNT = 100; + private static final int BATCH_PROCESS_COUNT = 100; FlowableBaseQueueEntryList(AMQQueue queue) { @@ -76,7 +76,6 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList if (_flowed.get() != flowed) { _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); - showUsage(); _flowed.set(flowed); } } @@ -126,20 +125,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } // Don't attempt to start the inhaler/purger unless we have a minimum value specified. - if (_memoryUsageMaximum > 0) + if (_memoryUsageMaximum >= 0) { setMemoryUsageMinimum(_memoryUsageMaximum / 2); // if we have now have to much memory in use we need to purge. if (_memoryUsageMaximum < _atomicQueueInMemory.get()) { - startPurger(); - } - } - else if (_memoryUsageMaximum == 0) - { - if (_atomicQueueInMemory.get() > 0) - { + setFlowed(true); startPurger(); } } @@ -165,16 +158,15 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList // Don't attempt to start the inhaler unless we have a minimum value specified. if (_memoryUsageMinimum > 0) { - checkAndStartLoader(); + checkAndStartInhaler(); } } - private void checkAndStartLoader() + private void checkAndStartInhaler() { - // If we've increased the minimum memory above what we have in memory then we need to inhale more - long inMemory = _atomicQueueInMemory.get(); - // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run. - if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0) + // If we've increased the minimum memory above what we have in memory then + // we need to inhale more if there is more + if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) { startInhaler(); } @@ -210,13 +202,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList * * @param queueEntry the entry to unload */ - public void unloadEntry(QueueEntry queueEntry) + public void entryUnloadedUpdateMemory(QueueEntry queueEntry) { if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) { _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); } - checkAndStartLoader(); + + checkAndStartInhaler(); } /** @@ -224,11 +217,13 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList * * @param queueEntry the entry to load */ - public void loadEntry(QueueEntry queueEntry) + public void entryLoadedUpdateMemory(QueueEntry queueEntry) { if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) { _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); + setFlowed(true); + startPurger(); } } @@ -311,11 +306,15 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } _asynchronousInhaler.compareAndSet(messageInhaler, null); - int inhaled = 0; + int inhaled = 1; - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) - && _asynchronousInhaler.compareAndSet(null, messageInhaler)) + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available + && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do + && (inhaled > 0) // ensure we could inhale something + && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler { + inhaled = 0; QueueEntryIterator iterator = iterator(); // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread. @@ -325,7 +324,13 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList //Find first AVAILABLE node } - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail()) + // Because the above loop checks then moves on to the next entry a check for atTail will return true but + // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based + // on the return from advance() which returns true if it can advance. + boolean atEndofList = false; + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory + && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do + && !atEndofList) // We have reached end of list QueueEntries { QueueEntry entry = iterator.getNode(); @@ -334,16 +339,20 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum) { // We don't have space for this message so we need to stop inhaling. - inhaled = BATCH_INHALE_COUNT; + if (_log.isDebugEnabled()) + { + _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity()); + } + inhaled = BATCH_PROCESS_COUNT; } else { - loadEntry(entry); + entry.load(); inhaled++; } } - iterator.advance(); + atEndofList = !iterator.advance(); } if (iterator.atTail()) @@ -402,20 +411,34 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList return; } + if (_log.isInfoEnabled()) + { + _log.info("Purger Running:" + _queue.getName()); + showUsage("Purger Running:" + _queue.getName()); + } + _asynchronousPurger.compareAndSet(messagePurger, null); + int purged = 0; - while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger)) + while ((_atomicQueueInMemory.get() > _memoryUsageMinimum) + && purged < BATCH_PROCESS_COUNT + && _asynchronousPurger.compareAndSet(null, messagePurger)) { QueueEntryIterator iterator = iterator(); + //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message + // as it may have just become AQUIRED and not yet delivered. + + //To be safe only purge available messages. This should be fine as long as we have a small prefetch. while (!iterator.getNode().isAvailable() && iterator.advance()) { //Find first AVAILABLE node } - // Count up the memory usage + // Count up the memory usage to find our minimum point long memoryUsage = 0; - while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail()) + boolean atTail = false; + while ((memoryUsage < _memoryUsageMaximum) && !atTail) { QueueEntry entry = iterator.getNode(); @@ -424,30 +447,37 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList memoryUsage += entry.getSize(); } - iterator.advance(); + atTail = !iterator.advance(); } //Purge remainging mesages on queue - while (!iterator.atTail()) + while (!atTail && (purged < BATCH_PROCESS_COUNT)) { QueueEntry entry = iterator.getNode(); if (entry.isAvailable() && !entry.isFlowed()) { entry.unload(); + purged++; } - iterator.advance(); + atTail = !iterator.advance(); } - _asynchronousInhaler.set(null); + _asynchronousPurger.set(null); } - //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. - if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + if (_log.isInfoEnabled()) { - _inhaler.execute(messagePurger); + _log.info("Purger Stopping:" + _queue.getName()); + showUsage("Purger Stopping:" + _queue.getName()); + } + //If we are still flowed and are over the minimum value then schedule to run again. + if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum) + { + _log.info("Rescheduling Purger:" + _queue.getName()); + _purger.execute(messagePurger); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index 61a3c45c49..89f6e6378b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -79,13 +79,17 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement long reclaimed = 0; //work down the priorities looking for memory - int scavangeIndex = _priorities - 1; //First: Don't take all the memory. So look for a queue that has more than 50% free - long currentMax; + int scavangeIndex = 0; + + if (scavangeIndex == index) + { + scavangeIndex++; + } - while (scavangeIndex >= 0 && reclaimed <= requriedSize) + while (scavangeIndex < _priorities && reclaimed <= requriedSize) { currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); long used = _priorityLists[scavangeIndex].memoryUsed(); @@ -105,15 +109,24 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement } else { - scavangeIndex--; + scavangeIndex++; + if (scavangeIndex == index) + { + scavangeIndex++; + } } - } + } - //Second: Just take the memory we need - if (scavangeIndex == -1) + //Second: Just take the free memory we need + if (scavangeIndex == _priorities) { - scavangeIndex = _priorities - 1; - while (scavangeIndex >= 0 && reclaimed <= requriedSize) + scavangeIndex = 0; + if (scavangeIndex == index) + { + scavangeIndex++; + } + + while (scavangeIndex < _priorities && reclaimed <= requriedSize) { currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); long used = _priorityLists[scavangeIndex].memoryUsed(); @@ -139,7 +152,51 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement } else { - scavangeIndex--; + scavangeIndex++; + if (scavangeIndex == index) + { + scavangeIndex++; + } + } + } + + //Third: Take required memory + if (scavangeIndex == _priorities) + { + scavangeIndex = 0; + if (scavangeIndex == index) + { + scavangeIndex++; + } + while (scavangeIndex < _priorities && reclaimed <= requriedSize) + { + currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); + + if (currentMax > 0 ) + { + long newMax = currentMax; + // Just take the amount of space required for this message. + if (newMax > requriedSize) + { + newMax = requriedSize; + } + _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); + + reclaimed += currentMax - newMax; + if (_log.isDebugEnabled()) + { + _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); + } + break; + } + else + { + scavangeIndex++; + if (scavangeIndex == index) + { + scavangeIndex++; + } + } } } } @@ -159,7 +216,10 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement _log.debug("No space found."); } - showUsage(); + if (_log.isTraceEnabled()) + { + showUsage("Add"); + } } return _priorityLists[index].add(message); @@ -170,7 +230,10 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement { if (_log.isDebugEnabled()) { - _log.debug(prefix); + if (prefix.length() != 0) + { + _log.debug(prefix); + } for (int index = 0; index < _priorities; index++) { QueueEntryList queueEntryList = _priorityLists[index]; @@ -280,10 +343,16 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement { boolean flowed = false; boolean full = true; - showUsage(); + + if (_log.isTraceEnabled()) + { + showUsage("isFlowed"); + } + for (QueueEntryList queueEntryList : _priorityLists) { - full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); + //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); + full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize(); flowed = flowed || (queueEntryList.isFlowed()); } return flowed && full; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 2a579b83c6..32abe3e1cf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -411,7 +411,7 @@ public class QueueEntryImpl implements QueueEntry //Update the memoryState if this load call resulted in the message being purged from memory if (!_flowed.getAndSet(true)) { - _queueEntryList.unloadEntry(this); + _queueEntryList.entryUnloadedUpdateMemory(this); } } catch (UnableToFlowMessageException utfme) { @@ -438,7 +438,7 @@ public class QueueEntryImpl implements QueueEntry //Update the memoryState if this load call resulted in the message comming in to memory if (_flowed.getAndSet(false)) { - _queueEntryList.loadEntry(this); + _queueEntryList.entryLoadedUpdateMemory(this); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 706f10dca1..a58c6eaf7d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -54,13 +54,13 @@ public interface QueueEntryList * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler. * @param queueEntry the entry that has been unloaded */ - void unloadEntry(QueueEntry queueEntry); + void entryUnloadedUpdateMemory(QueueEntry queueEntry); /** * Immediately update memory usage based on the load of this queueEntry * @param queueEntry the entry that has been loaded */ - void loadEntry(QueueEntry queueEntry); + void entryLoadedUpdateMemory(QueueEntry queueEntry); void stop(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 9f1472439c..5730e419d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -468,6 +468,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.isFlowed()) { + if(_logger.isDebugEnabled()) + { + _logger.debug("Synchoronus load of entry:" + entry.debugIdentity()); + } entry.load(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 623f57b224..800bb1ac9c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -134,17 +134,21 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest assertTrue("Queue is flowed.", !_queue.isFlowed()); // Send another and ensure we are flowed - sendMessage(txnContext); - + sendMessage(txnContext, 9); + + //Give the Purging Thread a chance to run + Thread.yield(); + Thread.sleep(500); + assertTrue("Queue is not flowed.", _queue.isFlowed()); - assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); - assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); - //send another 99 so there are 200msgs in total on the queue - for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) + //send another batch of messagse so the total in each queue is equal + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++) { - sendMessage(txnContext); + sendMessage(txnContext, (msgCount % 10)); long usage = _queue.getMemoryUsageCurrent(); assertTrue("Queue has gone over quota:" + usage, @@ -153,21 +157,22 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest assertTrue("Queue has a negative quota:" + usage, usage > 0); } - assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); _queue.registerSubscription(_subscription, false); int slept = 0; - while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10) { + Thread.yield(); Thread.sleep(500); slept++; } //Ensure the messages are retreived - assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size()); //Check the queue is still within it's limits. assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 5d7fa21d56..86d1948f20 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -434,7 +434,9 @@ public class SimpleAMQQueueTest extends TestCase NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); MESSAGE_SIZE = 1; - long MEMORY_MAX = 10; + /** Set to larger than the purge batch size. Default 100. + * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ + long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX; //Set the Memory Usage to be very low _queue.setMemoryUsageMaximum(MEMORY_MAX); @@ -457,8 +459,14 @@ public class SimpleAMQQueueTest extends TestCase _queue.setMemoryUsageMaximum(0L); - //Give the purger time to work - Thread.sleep(200); + //Give the purger time to work maximum of 1s + int slept = 0; + while (_queue.getMemoryUsageCurrent() > 0 && slept < 5) + { + Thread.yield(); + Thread.sleep(200); + slept++; + } assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); assertEquals(0L , _queue.getMemoryUsageCurrent()); |
