diff options
Diffstat (limited to 'java')
6 files changed, 89 insertions, 335 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 4b387eac53..ca7fc5abf6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -54,6 +54,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList protected boolean _disableFlowToDisk; private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); private static final int BATCH_PROCESS_COUNT = 100; + protected FlowableBaseQueueEntryList _parentQueue; FlowableBaseQueueEntryList(AMQQueue queue) { @@ -89,7 +90,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList { if (_log.isDebugEnabled()) { - _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed() + _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed() + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() + "/" + dataSize()); } @@ -97,7 +98,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList public boolean isFlowed() { - return _flowed.get(); + if (_parentQueue != null) + { + return _parentQueue.isFlowed(); + } + else + { + return _flowed.get(); + } } public int size() @@ -204,12 +212,19 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryUnloadedUpdateMemory(QueueEntry queueEntry) { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + if (_parentQueue != null) { - _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); + _parentQueue.entryUnloadedUpdateMemory(queueEntry); } + else + { + if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); + } - checkAndStartInhaler(); + checkAndStartInhaler(); + } } /** @@ -219,11 +234,18 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryLoadedUpdateMemory(QueueEntry queueEntry) { - if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (_parentQueue != null) { - _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); - setFlowed(true); - startPurger(); + _parentQueue.entryLoadedUpdateMemory(queueEntry); + } + else + { + if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + { + _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); + setFlowed(true); + startPurger(); + } } } @@ -241,28 +263,55 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList } } - protected void incrementCounters(final QueueEntryImpl queueEntry) + /** + * Mark this queue as part of another QueueEntryList for accounting purposes. + * + * All Calls from the QueueEntry to the QueueEntryList need to check if there is + * a parent QueueEntrylist upon which the action should take place. + * + * @param queueEntryList The parent queue that is performing accounting. + */ + public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList) { - _atomicQueueCount.incrementAndGet(); - _atomicQueueSize.addAndGet(queueEntry.getSize()); - long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + _parentQueue = queueEntryList; + } - if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum) + protected void incrementCounters(final QueueEntryImpl queueEntry) + { + if (_parentQueue != null) + { + _parentQueue.incrementCounters(queueEntry); + } + else { - setFlowed(true); - queueEntry.unload(); + _atomicQueueCount.incrementAndGet(); + _atomicQueueSize.addAndGet(queueEntry.getSize()); + long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + + if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum) + { + setFlowed(true); + queueEntry.unload(); + } } } protected void dequeued(QueueEntryImpl queueEntry) { - _atomicQueueCount.decrementAndGet(); - _atomicQueueSize.addAndGet(-queueEntry.getSize()); - if (!queueEntry.isFlowed()) + if (_parentQueue != null) { - if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + _parentQueue.dequeued(queueEntry); + } + else + { + _atomicQueueCount.decrementAndGet(); + _atomicQueueSize.addAndGet(-queueEntry.getSize()); + if (!queueEntry.isFlowed()) { - _log.error("InMemory Count just went below 0 on dequeue."); + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0 on dequeue."); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index fc11dd888a..83c7ebb4f2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -39,6 +39,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement for (int i = 0; i < priorities; i++) { _priorityLists[i] = new SimpleQueueEntryList(queue); + _priorityLists[i].setParentQueueEntryList(this); } showUsage("Created:" + _queue.getName()); @@ -66,183 +67,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement index = 0; } - long requriedSize = message.getSize(); - // Check and see if list would flow on adding message - if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum()) - { - if (_log.isDebugEnabled()) - { - _log.debug("Message(" + message.debugIdentity() + ") Add of size (" - + requriedSize + ") will cause flow. Searching for space"); - } - - long reclaimed = 0; - - //work down the priorities looking for memory - - //First: Don't take all the memory. So look for a queue that has more than 50% free - long currentMax; - int scavangeIndex = 0; - - if (scavangeIndex == index) - { - scavangeIndex++; - } - - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - long used = _priorityLists[scavangeIndex].memoryUsed(); - - if (used < currentMax / 2) - { - long newMax = currentMax / 2; - - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - - //Second: Just take the free memory we need - if (scavangeIndex == _priorities) - { - scavangeIndex = 0; - if (scavangeIndex == index) - { - scavangeIndex++; - } - - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - long used = _priorityLists[scavangeIndex].memoryUsed(); - - if (used < currentMax) - { - long newMax = currentMax - used; - - // if there are no messages at this priority just take it all - if (newMax == currentMax) - { - newMax = 0; - } - - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - - //Third: Take required memory - if (scavangeIndex == _priorities) - { - scavangeIndex = 0; - if (scavangeIndex == index) - { - scavangeIndex++; - } - while (scavangeIndex < _priorities && reclaimed <= requriedSize) - { - currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); - - if (currentMax > 0 ) - { - long newMax = currentMax; - // Just take the amount of space required for this message. - if (newMax > requriedSize) - { - newMax = requriedSize; - } - _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); - - reclaimed += currentMax - newMax; - if (_log.isDebugEnabled()) - { - _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); - } - break; - } - else - { - scavangeIndex++; - if (scavangeIndex == index) - { - scavangeIndex++; - } - } - } - } - } - - //Increment Maximum - if (reclaimed > 0) - { - if (_log.isDebugEnabled()) - { - _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed - + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed)); - } - _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed); - } - else - { - _log.debug("No space found."); - } - - if (_log.isTraceEnabled()) - { - showUsage("Add"); - } - } - return _priorityLists[index].add(message); } - @Override - protected void showUsage(String prefix) - { - if (_log.isDebugEnabled()) - { - if (prefix.length() != 0) - { - _log.debug(prefix); - } - for (int index = 0; index < _priorities; index++) - { - QueueEntryList queueEntryList = _priorityLists[index]; - _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed() - + "/" + queueEntryList.getMemoryUsageMaximum() - + "/" + queueEntryList.dataSize()); - } - } - } public QueueEntry next(QueueEntry node) { @@ -338,122 +165,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement } } - @Override - public boolean isFlowed() - { - boolean flowed = false; - boolean full = true; - - if (_log.isTraceEnabled()) - { - showUsage("isFlowed"); - } - - for (QueueEntryList queueEntryList : _priorityLists) - { - //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); - full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize(); - flowed = flowed || (queueEntryList.isFlowed()); - } - return flowed && full; - } - - @Override - public int size() - { - int size = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - size += queueEntryList.size(); - } - - return size; - } - - @Override - public long dataSize() - { - int dataSize = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - dataSize += queueEntryList.dataSize(); - } - - return dataSize; - } - - @Override - public long memoryUsed() - { - int memoryUsed = 0; - for (QueueEntryList queueEntryList : _priorityLists) - { - memoryUsed += queueEntryList.memoryUsed(); - } - - return memoryUsed; - } - - @Override - public void setMemoryUsageMaximum(long maximumMemoryUsage) - { - _memoryUsageMaximum = maximumMemoryUsage; - - if (maximumMemoryUsage >= 0) - { - _disableFlowToDisk = false; - } - - long share = maximumMemoryUsage / _priorities; - - //Apply a share of the maximum To each prioirty quue - for (QueueEntryList queueEntryList : _priorityLists) - { - queueEntryList.setMemoryUsageMaximum(share); - } - - if (maximumMemoryUsage < 0) - { - if (_log.isInfoEnabled()) - { - _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); - } - _disableFlowToDisk = true; - return; - } - - //ensure we use the full allocation of memory - long remainder = maximumMemoryUsage - (share * _priorities); - if (remainder > 0) - { - _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder); - } - } - - @Override - public long getMemoryUsageMaximum() - { - return _memoryUsageMaximum; - } - - @Override - public void setMemoryUsageMinimum(long minimumMemoryUsage) - { - _memoryUsageMinimum = minimumMemoryUsage; - - //Apply a share of the minimum To each prioirty quue - for (QueueEntryList queueEntryList : _priorityLists) - { - queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities); - } - } - - @Override - public long getMemoryUsageMinimum() - { - return _memoryUsageMinimum; - } - + @Override public void stop() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index b6e6365189..e6223ef4ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -155,6 +155,13 @@ public class QueueEntryImpl implements QueueEntry return (_flags & DELIVERED_TO_CONSUMER) != 0; } + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + * + * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should + * only be called after the message has been sent. + */ public void setDeliveredToSubscription() { _flags |= DELIVERED_TO_CONSUMER; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index a58c6eaf7d..2bbdf610de 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -63,4 +63,14 @@ public interface QueueEntryList void entryLoadedUpdateMemory(QueueEntry queueEntry); void stop(); + + /** + * Mark this queue as part of another QueueEntryList for accounting purposes. + * + * All Calls from the QueueEntry to the QueueEntryList need to check if there is + * a parent QueueEntrylist upon which the action should take place. + * + * @param queueEntryList The parent queue that is performing accounting. + */ + void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java index 0d6c5948b4..ee3e727b80 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java @@ -50,21 +50,4 @@ public class AMQQueueFactoryPriorityTest extends AMQQueueFactoryTest fail(e.getMessage()); } } - - @Override - public void testQueueValuesAfterCreation() - { - try - { - AMQQueue queue = createQueue(); - - assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum()); - //NOTE: Priority queue will show 0 as minimum as the minimum value is actually spread between its sub QELs - assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0, queue.getMemoryUsageMinimum()); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index 11049a7ae3..0f4230806f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -48,11 +48,4 @@ public class MockAMQMessage extends TransientAMQMessage _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID); _contentBodies = new ArrayList<ContentChunk>(); } - - - @Override - public long getSize() - { - return 0l; - } } |
