diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-26 16:57:21 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-26 16:57:21 +0000 |
| commit | 4c37019df181f3e032c7615e24c43c9858dc71e4 (patch) | |
| tree | dbf48163181735634df295f9dab5a3f711613a2a /java | |
| parent | 91a7ec3f337d2fc8aacd672b0455e0b85a144309 (diff) | |
| download | qpid-python-4c37019df181f3e032c7615e24c43c9858dc71e4.tar.gz | |
QPID-1768 : Removed all the special priority queue code. Added the ability for a FlowableBaseQueueEntryList to delegate its accounting to a parent QueueEntryList. This results in the PriorityQueueEntryList using the same FtD algorithm as SimpleQELs.
- New Messages on a flowed queue are pushed optimistically pushed to disk, this should potentially be removed and just rely on the purger to flush the correct message which in the Priority case may not be the last message in.
- When space is available messages are loaded in queue order, so in this case Priority order.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@758742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 89 insertions, 335 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 4b387eac53..ca7fc5abf6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -54,6 +54,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList protected boolean _disableFlowToDisk; private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); private static final int BATCH_PROCESS_COUNT = 100; + protected FlowableBaseQueueEntryList _parentQueue; FlowableBaseQueueEntryList(AMQQueue queue) { @@ -89,7 +90,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList { if (_log.isDebugEnabled()) { - _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed() + _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed() + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() + "/" + dataSize()); } @@ -97,7 +98,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList public boolean isFlowed() { - return _flowed.get(); + if (_parentQueue != null) + { + return _parentQueue.isFlowed(); + } + else + { + return _flowed.get(); + } } public int size() @@ -204,12 +212,19 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryUnloadedUpdateMemory(QueueEntry queueEntry) { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + if (_parentQueue != null) { - _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); + _parentQueue.entryUnloadedUpdateMemory(queueEntry); } + else + { + if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); + } - checkAndStartInhaler(); + checkAndStartInhaler(); + } } /** @@ -219,11 +234,18 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryLoadedUpdateMemory(QueueEntry queueEntry) { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (_parentQueue != null) { - _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); - setFlowed(true); - startPurger(); + _parentQueue.entryLoadedUpdateMemory(queueEntry); + } + else + { + if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + { + _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); + setFlowed(true); + startPurger(); + } } } @@ -241,28 +263,55 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } } - protected void incrementCounters(final QueueEntryImpl queueEntry) + /** + * Mark this queue as part of another QueueEntryList for accounting purposes. + * + * All Calls from the QueueEntry to the QueueEntryList need to check if there is + * a parent QueueEntrylist upon which the action should take place. + * + * @param queueEntryList The parent queue that is performing accounting. + */ + public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList) { - _atomicQueueCount.incrementAndGet(); - _atomicQueueSize.addAndGet(queueEntry.getSize()); - long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + _parentQueue = queueEntryList; + } - if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum) + protected void incrementCounters(final QueueEntryImpl queueEntry) + { + if (_parentQueue != null) + { + _parentQueue.incrementCounters(queueEntry); + } + else { - setFlowed(true); - queueEntry.unload(); + _atomicQueueCount.incrementAndGet(); + _atomicQueueSize.addAndGet(queueEntry.getSize()); + long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + + if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum) + { + setFlowed(true); + queueEntry.unload(); + } } } protected void dequeued(QueueEntryImpl queueEntry) { - _atomicQueueCount.decrementAndGet(); - _atomicQueueSize.addAndGet(-queueEntry.getSize()); - if (!queueEntry.isFlowed()) + if (_parentQueue != null) { - if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + _parentQueue.dequeued(queueEntry); + } + else + { + _atomicQueueCount.decrementAndGet(); + _atomicQueueSize.addAndGet(-queueEntry.getSize()); + if (!queueEntry.isFlowed()) { - _log.error("InMemory Count just went below 0 on dequeue."); + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0 on dequeue."); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index fc11dd888a..83c7ebb4f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -39,6 +39,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement for (int i = 0; i < priorities; i++) { _priorityLists[i] = new SimpleQueueEntryList(queue); + _priorityLists[i].setParentQueueEntryList(this); } showUsage("Created:" + _queue.getName()); @@ -66,183 +67,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement index = 0; } - long requriedSize = message.getSize(); - // Check and see if list would flow on adding message - if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum()) - { - if (_log.isDebugEnabled()) - { - _log.debug("Message(" + message.debugIdentity() + ") Add of size (" - + requriedSize + ") will cause flow. Searching for space"); - } - - long reclaimed = 0; - - //work down the priorities looking for memory - - //First: Don't take all the memory. So look for a queue that has more than 50% free - long currentMax; - int scavangeIndex = 0; - - if (scavangeIndex == index) - { - scavangeIndex++; - } - - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - long used = _priorityLists[scavangeIndex].memoryUsed(); - - if (used < currentMax / 2) - { - long newMax = currentMax / 2; - - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - - //Second: Just take the free memory we need - if (scavangeIndex == _priorities) - { - scavangeIndex = 0; - if (scavangeIndex == index) - { - scavangeIndex++; - } - - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - long used = _priorityLists[scavangeIndex].memoryUsed(); - - if (used < currentMax) - { - long newMax = currentMax - used; - - // if there are no messages at this priority just take it all - if (newMax == currentMax) - { - newMax = 0; - } - - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - - //Third: Take required memory - if (scavangeIndex == _priorities) - { - scavangeIndex = 0; - if (scavangeIndex == index) - { - scavangeIndex++; - } - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - - if (currentMax > 0 ) - { - long newMax = currentMax; - // Just take the amount of space required for this message. - if (newMax > requriedSize) - { - newMax = requriedSize; - } - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - } - } - - //Increment Maximum - if (reclaimed > 0) - { - if (_log.isDebugEnabled()) - { - _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed - + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed)); - } - _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed); - } - else - { - _log.debug("No space found."); - } - - if (_log.isTraceEnabled()) - { - showUsage("Add"); - } - } - return _priorityLists[index].add(message); } - @Override - protected void showUsage(String prefix) - { - if (_log.isDebugEnabled()) - { - if (prefix.length() != 0) - { - _log.debug(prefix); - } - for (int index = 0; index < _priorities; index++) - { - QueueEntryList queueEntryList = _priorityLists[index]; - _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed() - + "/" + queueEntryList.getMemoryUsageMaximum() - + "/" + queueEntryList.dataSize()); - } - } - } public QueueEntry next(QueueEntry node) { @@ -338,122 +165,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement } } - @Override - public boolean isFlowed() - { - boolean flowed = false; - boolean full = true; - - if (_log.isTraceEnabled()) - { - showUsage("isFlowed"); - } - - for (QueueEntryList queueEntryList : _priorityLists) - { - //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); - full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize(); - flowed = flowed || (queueEntryList.isFlowed()); - } - return flowed && full; - } - - @Override - public int size() - { - int size = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - size += queueEntryList.size(); - } - - return size; - } - - @Override - public long dataSize() - { - int dataSize = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - dataSize += queueEntryList.dataSize(); - } - - return dataSize; - } - - @Override - public long memoryUsed() - { - int memoryUsed = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - memoryUsed += queueEntryList.memoryUsed(); - } - - return memoryUsed; - } - - @Override - public void setMemoryUsageMaximum(long maximumMemoryUsage) - { - _memoryUsageMaximum = maximumMemoryUsage; - - if (maximumMemoryUsage >= 0) - { - _disableFlowToDisk = false; - } - - long share = maximumMemoryUsage / _priorities; - - //Apply a share of the maximum To each prioirty quue - for (QueueEntryList queueEntryList : _priorityLists) - { - queueEntryList.setMemoryUsageMaximum(share); - } - - if (maximumMemoryUsage < 0) - { - if (_log.isInfoEnabled()) - { - _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); - } - _disableFlowToDisk = true; - return; - } - - //ensure we use the full allocation of memory - long remainder = maximumMemoryUsage - (share * _priorities); - if (remainder > 0) - { - _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder); - } - } - - @Override - public long getMemoryUsageMaximum() - { - return _memoryUsageMaximum; - } - - @Override - public void setMemoryUsageMinimum(long minimumMemoryUsage) - { - _memoryUsageMinimum = minimumMemoryUsage; - - //Apply a share of the minimum To each prioirty quue - for (QueueEntryList queueEntryList : _priorityLists) - { - queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities); - } - } - - @Override - public long getMemoryUsageMinimum() - { - return _memoryUsageMinimum; - } - + @Override public void stop() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index b6e6365189..e6223ef4ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -155,6 +155,13 @@ public class QueueEntryImpl implements QueueEntry return (_flags & DELIVERED_TO_CONSUMER) != 0; } + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + * + * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should + * only be called after the message has been sent. + */ public void setDeliveredToSubscription() { _flags |= DELIVERED_TO_CONSUMER; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index a58c6eaf7d..2bbdf610de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -63,4 +63,14 @@ public interface QueueEntryList void entryLoadedUpdateMemory(QueueEntry queueEntry); void stop(); + + /** + * Mark this queue as part of another QueueEntryList for accounting purposes. + * + * All Calls from the QueueEntry to the QueueEntryList need to check if there is + * a parent QueueEntrylist upon which the action should take place. + * + * @param queueEntryList The parent queue that is performing accounting. + */ + void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java index 0d6c5948b4..ee3e727b80 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java @@ -50,21 +50,4 @@ public class AMQQueueFactoryPriorityTest extends AMQQueueFactoryTest fail(e.getMessage()); } } - - @Override - public void testQueueValuesAfterCreation() - { - try - { - AMQQueue queue = createQueue(); - - assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum()); - //NOTE: Priority queue will show 0 as minimum as the minimum value is actually spread between its sub QELs - assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0, queue.getMemoryUsageMinimum()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index 11049a7ae3..0f4230806f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -48,11 +48,4 @@ public class MockAMQMessage extends TransientAMQMessage _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID); _contentBodies = new ArrayList<ContentChunk>(); } - - - @Override - public long getSize() - { - return 0l; - } } |
