diff options
Diffstat (limited to 'java/broker/src/main')
10 files changed, 532 insertions, 190 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 83fcfad1fd..30bf8aba5d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -114,7 +114,7 @@ public class QueueConfiguration public long getMemoryUsageMaximum() { - return _config.getLong("maximumMemoryUsage", 0); + return _config.getLong("maximumMemoryUsage", -1); } public long getMemoryUsageMinimum() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index a4f80a44b4..599f0a8ca4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,25 +31,29 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** This is an abstract base class to handle */ -public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList +public abstract class FlowableBaseQueueEntryList implements QueueEntryList { - private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); + protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); + protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ - private long _memoryUsageMaximum = 0; + protected long _memoryUsageMaximum = -1L; /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ - private long _memoryUsageMinimum = 0; + protected long _memoryUsageMinimum = 0; private volatile AtomicBoolean _flowed; private QueueBackingStore _backingStore; protected AMQQueue _queue; private Executor _inhaler; + private Executor _purger; private AtomicBoolean _stopped; private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); + protected boolean _disabled; + private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); + private static final int BATCH_INHALE_COUNT = 100; FlowableBaseQueueEntryList(AMQQueue queue) { @@ -64,6 +67,8 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _stopped = new AtomicBoolean(false); _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _disabled = true; } public void setFlowed(boolean flowed) @@ -71,10 +76,26 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi if (_flowed.get() != flowed) { _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); + showUsage(); _flowed.set(flowed); } } + protected void showUsage() + { + showUsage(""); + } + + protected void showUsage(String prefix) + { + if (_log.isDebugEnabled()) + { + _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed() + + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() + + "/" + dataSize()); + } + } + public boolean isFlowed() { return _flowed.get(); @@ -99,13 +120,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi { _memoryUsageMaximum = maximumMemoryUsage; + if (maximumMemoryUsage >= 0) + { + _disabled = false; + } + // Don't attempt to start the inhaler/purger unless we have a minimum value specified. if (_memoryUsageMaximum > 0) { - if (_memoryUsageMinimum == 0) - { - setMemoryUsageMinimum(_memoryUsageMaximum / 2); - } + setMemoryUsageMinimum(_memoryUsageMaximum / 2); // if we have now have to much memory in use we need to purge. if (_memoryUsageMaximum < _atomicQueueInMemory.get()) @@ -113,6 +136,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi startPurger(); } } + else if (_memoryUsageMaximum == 0) + { + if (_atomicQueueInMemory.get() > 0) + { + startPurger(); + } + } + else + { + if (_log.isInfoEnabled()) + { + _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); + } + _disabled = true; + } } public long getMemoryUsageMaximum() @@ -134,7 +172,9 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void checkAndStartLoader() { // If we've increased the minimum memory above what we have in memory then we need to inhale more - if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + long inMemory = _atomicQueueInMemory.get(); + // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run. + if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0) { startInhaler(); } @@ -142,22 +182,22 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void startInhaler() { - if (_flowed.get()) - { - MessageInhaler inhaler = new MessageInhaler(); + MessageInhaler inhaler = new MessageInhaler(); - if (_asynchronousInhaler.compareAndSet(null, inhaler)) - { - _inhaler.execute(inhaler); - } + if (_asynchronousInhaler.compareAndSet(null, inhaler)) + { + _inhaler.execute(inhaler); } } private void startPurger() { - //TODO create purger, used when maxMemory is reduced creating over memory situation. - _log.warn("Requested Purger Start.. purger TBC."); - //_purger.execute(new MessagePurger(this)); + MessagePurger purger = new MessagePurger(); + + if (_asynchronousPurger.compareAndSet(null, purger)) + { + _purger.execute(purger); + } } public long getMemoryUsageMinimum() @@ -165,26 +205,30 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi return _memoryUsageMinimum; } + /** + * Only to be called by the QueueEntry + * + * @param queueEntry the entry to unload + */ public void unloadEntry(QueueEntry queueEntry) { - try - { - queueEntry.unload(); - _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); - checkAndStartLoader(); - } - catch (UnableToFlowMessageException e) + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); } + checkAndStartLoader(); } + /** + * Only to be called from the QueueEntry + * + * @param queueEntry the entry to load + */ public void loadEntry(QueueEntry queueEntry) { - queueEntry.load(); - if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) { - _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum); + _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); } } @@ -196,44 +240,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi // rather than actively shutdown our threads. //Shutdown thread for inhaler. ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } } - protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry) - { - return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum; - } - protected void incrementCounters(final QueueEntryImpl queueEntry) { _atomicQueueCount.incrementAndGet(); _atomicQueueSize.addAndGet(queueEntry.getSize()); - if (!willCauseFlowToDisk(queueEntry)) - { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - } - else - { - setFlowed(true); - flowingToDisk(queueEntry); - } - } + long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - /** - * Called when we are now flowing to disk - * - * @param queueEntry the entry that is being flowed to disk - */ - protected void flowingToDisk(QueueEntryImpl queueEntry) - { - try + if (!_disabled && inUseMemory > _memoryUsageMaximum) { + setFlowed(true); queueEntry.unload(); } - catch (UnableToFlowMessageException e) - { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - } } protected void dequeued(QueueEntryImpl queueEntry) @@ -242,7 +263,10 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _atomicQueueSize.addAndGet(-queueEntry.getSize()); if (!queueEntry.isFlowed()) { - _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0 on dequeue."); + } } } @@ -270,32 +294,53 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void inhaleList(MessageInhaler messageInhaler) { - _log.info("Inhaler Running"); + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Running:" + _queue.getName()); + showUsage("Inhaler Running:" + _queue.getName()); + } // If in memory count is at or over max then we can't inhale if (_atomicQueueInMemory.get() >= _memoryUsageMaximum) { - _log.debug("Unable to start inhaling as we are already over quota:" + - _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start inhaling as we are already over quota:" + + _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + } return; } _asynchronousInhaler.compareAndSet(messageInhaler, null); - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler)) + int inhaled = 0; + + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) + && _asynchronousInhaler.compareAndSet(null, messageInhaler)) { QueueEntryIterator iterator = iterator(); - while (!iterator.getNode().isAvailable() && iterator.advance()) + // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread. + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) + && !iterator.getNode().isAvailable() && iterator.advance()) { //Find first AVAILABLE node } - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail()) + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail()) { QueueEntry entry = iterator.getNode(); if (entry.isAvailable() && entry.isFlowed()) { - loadEntry(entry); + if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum) + { + // We don't have space for this message so we need to stop inhaling. + inhaled = BATCH_INHALE_COUNT; + } + else + { + loadEntry(entry); + inhaled++; + } } iterator.advance(); @@ -309,13 +354,100 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _asynchronousInhaler.set(null); } + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Stopping:" + _queue.getName()); + showUsage("Inhaler Stopping:" + _queue.getName()); + } + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) { + if (_log.isInfoEnabled()) + { + _log.info("Rescheduling Inhaler:" + _queue.getName()); + } _inhaler.execute(messageInhaler); - } } + private class MessagePurger implements Runnable + { + public void run() + { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); + try + { + purgeList(this); + } + finally + { + Thread.currentThread().setName(threadName); + } + } + } + + private void purgeList(MessagePurger messagePurger) + { + // If in memory count is at or over max then we can't inhale + if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + { + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start purging as we are already below our minimum cache level:" + + _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum); + } + return; + } + + _asynchronousPurger.compareAndSet(messagePurger, null); + + while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger)) + { + QueueEntryIterator iterator = iterator(); + + while (!iterator.getNode().isAvailable() && iterator.advance()) + { + //Find first AVAILABLE node + } + + // Count up the memory usage + long memoryUsage = 0; + while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail()) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + memoryUsage += entry.getSize(); + } + + iterator.advance(); + } + + //Purge remainging mesages on queue + while (!iterator.atTail()) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + entry.unload(); + } + + iterator.advance(); + } + + _asynchronousInhaler.set(null); + } + + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. + if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + { + _inhaler.execute(messagePurger); + + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java deleted file mode 100644 index b547a41047..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.concurrent.atomic.AtomicLong; - -public interface FlowableQueueEntryList extends QueueEntryList -{ - void setFlowed(boolean flowed); - - boolean isFlowed(); - - int size(); - - long dataSize(); - - long memoryUsed(); - - void setMemoryUsageMaximum(long maximumMemoryUsage); - - long getMemoryUsageMaximum(); - - void setMemoryUsageMinimum(long minimumMemoryUsage); - - long getMemoryUsageMinimum(); - - /** - * Immediately unload Entry - * @param queueEntry the entry to unload - */ - public void unloadEntry(QueueEntry queueEntry); - - /** - * Immediately load Entry - * @param queueEntry the entry to load - */ - public void loadEntry(QueueEntry queueEntry); - - void stop(); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index 23307d8acf..61a3c45c49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.CommonContentHeaderProperties; public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList { private final AMQQueue _queue; - private final FlowableQueueEntryList[] _priorityLists; + private final QueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; @@ -33,13 +33,15 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement { super(queue); _queue = queue; - _priorityLists = new FlowableQueueEntryList[priorities]; + _priorityLists = new QueueEntryList[priorities]; _priorities = priorities; - _priorityOffset = 5-((priorities + 1)/2); - for(int i = 0; i < priorities; i++) + _priorityOffset = 5 - ((priorities + 1) / 2); + for (int i = 0; i < priorities; i++) { _priorityLists[i] = new SimpleQueueEntryList(queue); } + + showUsage("Created:" + _queue.getName()); } public int getPriorities() @@ -54,33 +56,149 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public QueueEntry add(AMQMessage message) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) + int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; + if (index >= _priorities) { - index = _priorities-1; + index = _priorities - 1; } - else if(index < 0) + else if (index < 0) { index = 0; } + + long requriedSize = message.getSize(); + // Check and see if list would flow on adding message + if (!_disabled && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum()) + { + if (_log.isDebugEnabled()) + { + _log.debug("Message(" + message.debugIdentity() + ") Add of size (" + + requriedSize + ") will cause flow. Searching for space"); + } + + long reclaimed = 0; + + //work down the priorities looking for memory + int scavangeIndex = _priorities - 1; + + //First: Don't take all the memory. So look for a queue that has more than 50% free + + long currentMax; + + while (scavangeIndex >= 0 && reclaimed <= requriedSize) + { + currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); + long used = _priorityLists[scavangeIndex].memoryUsed(); + + if (used < currentMax / 2) + { + long newMax = currentMax / 2; + + _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); + + reclaimed += currentMax - newMax; + if (_log.isDebugEnabled()) + { + _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); + } + break; + } + else + { + scavangeIndex--; + } + } + + //Second: Just take the memory we need + if (scavangeIndex == -1) + { + scavangeIndex = _priorities - 1; + while (scavangeIndex >= 0 && reclaimed <= requriedSize) + { + currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); + long used = _priorityLists[scavangeIndex].memoryUsed(); + + if (used < currentMax) + { + long newMax = currentMax - used; + + // if there are no messages at this priority just take it all + if (newMax == currentMax) + { + newMax = 0; + } + + _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); + + reclaimed += currentMax - newMax; + if (_log.isDebugEnabled()) + { + _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); + } + break; + } + else + { + scavangeIndex--; + } + } + } + + //Increment Maximum + if (reclaimed > 0) + { + if (_log.isDebugEnabled()) + { + _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed + + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed)); + } + _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed); + } + else + { + _log.debug("No space found."); + } + + showUsage(); + } + return _priorityLists[index].add(message); } + @Override + protected void showUsage(String prefix) + { + if (_log.isDebugEnabled()) + { + _log.debug(prefix); + for (int index = 0; index < _priorities; index++) + { + QueueEntryList queueEntryList = _priorityLists[index]; + _log.debug("Queue (" + _queue + ":" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed() + + "/" + queueEntryList.getMemoryUsageMaximum() + + "/" + queueEntryList.dataSize()); + } + } + } + public QueueEntry next(QueueEntry node) { - QueueEntryImpl nodeImpl = (QueueEntryImpl)node; + QueueEntryImpl nodeImpl = (QueueEntryImpl) node; QueueEntry next = nodeImpl.getNext(); - if(next == null) + if (next == null) { QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); int index; - for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--); + for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--) + { + ; + } - while(next == null && index != 0) + while (next == null && index != 0) { index--; - next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext(); + next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext(); } } @@ -89,24 +207,23 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement private final class PriorityQueueEntryListIterator implements QueueEntryIterator { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; + private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length]; private QueueEntry _lastNode; PriorityQueueEntryListIterator() { - for(int i = 0; i < _priorityLists.length; i++) + for (int i = 0; i < _priorityLists.length; i++) { _iterators[i] = _priorityLists[i].iterator(); } _lastNode = _iterators[_iterators.length - 1].getNode(); } - public boolean atTail() { - for(int i = 0; i < _iterators.length; i++) + for (int i = 0; i < _iterators.length; i++) { - if(!_iterators[i].atTail()) + if (!_iterators[i].atTail()) { return false; } @@ -121,9 +238,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public boolean advance() { - for(int i = _iterators.length-1; i >= 0; i--) + for (int i = _iterators.length - 1; i >= 0; i--) { - if(_iterators[i].advance()) + if (_iterators[i].advance()) { _lastNode = _iterators[i].getNode(); return true; @@ -140,7 +257,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public QueueEntry getHead() { - return _priorityLists[_priorities-1].getHead(); + return _priorityLists[_priorities - 1].getHead(); } static class Factory implements QueueEntryListFactory @@ -152,17 +269,31 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement _priorities = priorities; } - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) + public QueueEntryList createQueueEntryList(AMQQueue queue) { return new PriorityQueueEntryList(queue, _priorities); } } @Override + public boolean isFlowed() + { + boolean flowed = false; + boolean full = true; + showUsage(); + for (QueueEntryList queueEntryList : _priorityLists) + { + full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); + flowed = flowed || (queueEntryList.isFlowed()); + } + return flowed && full; + } + + @Override public int size() { - int size=0; - for (FlowableQueueEntryList queueEntryList : _priorityLists) + int size = 0; + for (QueueEntryList queueEntryList : _priorityLists) { size += queueEntryList.size(); } @@ -170,10 +301,96 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement return size; } + @Override + public long dataSize() + { + int dataSize = 0; + for (QueueEntryList queueEntryList : _priorityLists) + { + dataSize += queueEntryList.dataSize(); + } + + return dataSize; + } + + @Override + public long memoryUsed() + { + int memoryUsed = 0; + for (QueueEntryList queueEntryList : _priorityLists) + { + memoryUsed += queueEntryList.memoryUsed(); + } + + return memoryUsed; + } + + @Override + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + _memoryUsageMaximum = maximumMemoryUsage; + + if (maximumMemoryUsage >= 0) + { + _disabled = false; + } + + long share = maximumMemoryUsage / _priorities; + + //Apply a share of the maximum To each prioirty quue + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.setMemoryUsageMaximum(share); + } + + if (maximumMemoryUsage < 0) + { + if (_log.isInfoEnabled()) + { + _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); + } + _disabled = true; + return; + } + + //ensure we use the full allocation of memory + long remainder = maximumMemoryUsage - (share * _priorities); + if (remainder > 0) + { + _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder); + } + } + + @Override + public long getMemoryUsageMaximum() + { + return _memoryUsageMaximum; + } + + @Override + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + _memoryUsageMinimum = minimumMemoryUsage; + + //Apply a share of the minimum To each prioirty quue + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities); + } + } @Override - protected void flowingToDisk(QueueEntryImpl queueEntry) + public long getMemoryUsageMinimum() { - //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists + return _memoryUsageMinimum; + } + + @Override + public void stop() + { + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.stop(); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 7e41cf53a2..2cb1493a8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -221,7 +221,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean removeStateChangeListener(StateChangeListener listener); - void unload() throws UnableToFlowMessageException; + void unload(); void load(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 8ee03d3d74..2a579b83c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -393,17 +393,34 @@ public class QueueEntryImpl implements QueueEntry return false; } - public void unload() throws UnableToFlowMessageException + public void unload() { if (_message != null && _backingStore != null) { - if(_log.isDebugEnabled()) + + try { - _log.debug("Unloading:" + debugIdentity()); + _backingStore.unload(_message); + + if(_log.isDebugEnabled()) + { + _log.debug("Unloaded:" + debugIdentity()); + } + + _message = null; + //Update the memoryState if this load call resulted in the message being purged from memory + if (!_flowed.getAndSet(true)) + { + _queueEntryList.unloadEntry(this); + } + + } catch (UnableToFlowMessageException utfme) { + // There is no recovery needed as the memory states remain unchanged. + if(_log.isDebugEnabled()) + { + _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage()); + } } - _backingStore.unload(_message); - _message = null; - _flowed.getAndSet(true); } } @@ -412,11 +429,17 @@ public class QueueEntryImpl implements QueueEntry if (_messageId != null && _backingStore != null) { _message = _backingStore.load(_messageId); + if(_log.isDebugEnabled()) { - _log.debug("Loading:" + debugIdentity()); + _log.debug("Loaded:" + debugIdentity()); + } + + //Update the memoryState if this load call resulted in the message comming in to memory + if (_flowed.getAndSet(false)) + { + _queueEntryList.loadEntry(this); } - _flowed.getAndSet(false); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 313e076f61..706f10dca1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -1,23 +1,23 @@ /* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; public interface QueueEntryList @@ -31,4 +31,36 @@ public interface QueueEntryList QueueEntryIterator iterator(); QueueEntry getHead(); + + void setFlowed(boolean flowed); + + boolean isFlowed(); + + int size(); + + long dataSize(); + + long memoryUsed(); + + void setMemoryUsageMaximum(long maximumMemoryUsage); + + long getMemoryUsageMaximum(); + + void setMemoryUsageMinimum(long minimumMemoryUsage); + + long getMemoryUsageMinimum(); + + /** + * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler. + * @param queueEntry the entry that has been unloaded + */ + void unloadEntry(QueueEntry queueEntry); + + /** + * Immediately update memory usage based on the load of this queueEntry + * @param queueEntry the entry that has been loaded + */ + void loadEntry(QueueEntry queueEntry); + + void stop(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java index b4a868cf3c..4dbce45f67 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java @@ -22,5 +22,5 @@ package org.apache.qpid.server.queue; interface QueueEntryListFactory { - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue); + public QueueEntryList createQueueEntryList(AMQQueue queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index fa67162228..9f1472439c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private volatile Subscription _exclusiveSubscriber; - protected final FlowableQueueEntryList _entries; + protected final QueueEntryList _entries; private final AMQQueueMBean _managedObject; private final Executor _asyncDelivery; @@ -468,8 +468,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.isFlowed()) { - _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity()); - _entries.loadEntry(entry); + entry.load(); } sub.send(entry); @@ -477,7 +476,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // We have delivered this message so we can unload it. if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer()) { - _entries.unloadEntry(entry); + entry.unload(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index c72353db6e..a10e332ef5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,9 +1,6 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /* * @@ -172,7 +169,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList static class Factory implements QueueEntryListFactory { - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) + public QueueEntryList createQueueEntryList(AMQQueue queue) { return new SimpleQueueEntryList(queue); } |
