diff options
14 files changed, 673 insertions, 211 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 83fcfad1fd..30bf8aba5d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -114,7 +114,7 @@ public class QueueConfiguration public long getMemoryUsageMaximum() { - return _config.getLong("maximumMemoryUsage", 0); + return _config.getLong("maximumMemoryUsage", -1); } public long getMemoryUsageMinimum() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index a4f80a44b4..599f0a8ca4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,25 +31,29 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** This is an abstract base class to handle */ -public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList +public abstract class FlowableBaseQueueEntryList implements QueueEntryList { - private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); + protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); + protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ - private long _memoryUsageMaximum = 0; + protected long _memoryUsageMaximum = -1L; /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ - private long _memoryUsageMinimum = 0; + protected long _memoryUsageMinimum = 0; private volatile AtomicBoolean _flowed; private QueueBackingStore _backingStore; protected AMQQueue _queue; private Executor _inhaler; + private Executor _purger; private AtomicBoolean _stopped; private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null); + protected boolean _disabled; + private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null); + private static final int BATCH_INHALE_COUNT = 100; FlowableBaseQueueEntryList(AMQQueue queue) { @@ -64,6 +67,8 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _stopped = new AtomicBoolean(false); _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + _disabled = true; } public void setFlowed(boolean flowed) @@ -71,10 +76,26 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi if (_flowed.get() != flowed) { _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); + showUsage(); _flowed.set(flowed); } } + protected void showUsage() + { + showUsage(""); + } + + protected void showUsage(String prefix) + { + if (_log.isDebugEnabled()) + { + _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed() + + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum() + + "/" + dataSize()); + } + } + public boolean isFlowed() { return _flowed.get(); @@ -99,13 +120,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi { _memoryUsageMaximum = maximumMemoryUsage; + if (maximumMemoryUsage >= 0) + { + _disabled = false; + } + // Don't attempt to start the inhaler/purger unless we have a minimum value specified. if (_memoryUsageMaximum > 0) { - if (_memoryUsageMinimum == 0) - { - setMemoryUsageMinimum(_memoryUsageMaximum / 2); - } + setMemoryUsageMinimum(_memoryUsageMaximum / 2); // if we have now have to much memory in use we need to purge. if (_memoryUsageMaximum < _atomicQueueInMemory.get()) @@ -113,6 +136,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi startPurger(); } } + else if (_memoryUsageMaximum == 0) + { + if (_atomicQueueInMemory.get() > 0) + { + startPurger(); + } + } + else + { + if (_log.isInfoEnabled()) + { + _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); + } + _disabled = true; + } } public long getMemoryUsageMaximum() @@ -134,7 +172,9 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void checkAndStartLoader() { // If we've increased the minimum memory above what we have in memory then we need to inhale more - if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + long inMemory = _atomicQueueInMemory.get(); + // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run. + if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0) { startInhaler(); } @@ -142,22 +182,22 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void startInhaler() { - if (_flowed.get()) - { - MessageInhaler inhaler = new MessageInhaler(); + MessageInhaler inhaler = new MessageInhaler(); - if (_asynchronousInhaler.compareAndSet(null, inhaler)) - { - _inhaler.execute(inhaler); - } + if (_asynchronousInhaler.compareAndSet(null, inhaler)) + { + _inhaler.execute(inhaler); } } private void startPurger() { - //TODO create purger, used when maxMemory is reduced creating over memory situation. - _log.warn("Requested Purger Start.. purger TBC."); - //_purger.execute(new MessagePurger(this)); + MessagePurger purger = new MessagePurger(); + + if (_asynchronousPurger.compareAndSet(null, purger)) + { + _purger.execute(purger); + } } public long getMemoryUsageMinimum() @@ -165,26 +205,30 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi return _memoryUsageMinimum; } + /** + * Only to be called by the QueueEntry + * + * @param queueEntry the entry to unload + */ public void unloadEntry(QueueEntry queueEntry) { - try - { - queueEntry.unload(); - _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); - checkAndStartLoader(); - } - catch (UnableToFlowMessageException e) + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); } + checkAndStartLoader(); } + /** + * Only to be called from the QueueEntry + * + * @param queueEntry the entry to load + */ public void loadEntry(QueueEntry queueEntry) { - queueEntry.load(); - if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) { - _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum); + _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); } } @@ -196,44 +240,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi // rather than actively shutdown our threads. //Shutdown thread for inhaler. ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } } - protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry) - { - return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum; - } - protected void incrementCounters(final QueueEntryImpl queueEntry) { _atomicQueueCount.incrementAndGet(); _atomicQueueSize.addAndGet(queueEntry.getSize()); - if (!willCauseFlowToDisk(queueEntry)) - { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - } - else - { - setFlowed(true); - flowingToDisk(queueEntry); - } - } + long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - /** - * Called when we are now flowing to disk - * - * @param queueEntry the entry that is being flowed to disk - */ - protected void flowingToDisk(QueueEntryImpl queueEntry) - { - try + if (!_disabled && inUseMemory > _memoryUsageMaximum) { + setFlowed(true); queueEntry.unload(); } - catch (UnableToFlowMessageException e) - { - _atomicQueueInMemory.addAndGet(queueEntry.getSize()); - } } protected void dequeued(QueueEntryImpl queueEntry) @@ -242,7 +263,10 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _atomicQueueSize.addAndGet(-queueEntry.getSize()); if (!queueEntry.isFlowed()) { - _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); + if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + { + _log.error("InMemory Count just went below 0 on dequeue."); + } } } @@ -270,32 +294,53 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi private void inhaleList(MessageInhaler messageInhaler) { - _log.info("Inhaler Running"); + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Running:" + _queue.getName()); + showUsage("Inhaler Running:" + _queue.getName()); + } // If in memory count is at or over max then we can't inhale if (_atomicQueueInMemory.get() >= _memoryUsageMaximum) { - _log.debug("Unable to start inhaling as we are already over quota:" + - _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start inhaling as we are already over quota:" + + _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum); + } return; } _asynchronousInhaler.compareAndSet(messageInhaler, null); - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler)) + int inhaled = 0; + + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) + && _asynchronousInhaler.compareAndSet(null, messageInhaler)) { QueueEntryIterator iterator = iterator(); - while (!iterator.getNode().isAvailable() && iterator.advance()) + // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread. + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) + && !iterator.getNode().isAvailable() && iterator.advance()) { //Find first AVAILABLE node } - while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail()) + while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail()) { QueueEntry entry = iterator.getNode(); if (entry.isAvailable() && entry.isFlowed()) { - loadEntry(entry); + if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum) + { + // We don't have space for this message so we need to stop inhaling. + inhaled = BATCH_INHALE_COUNT; + } + else + { + loadEntry(entry); + inhaled++; + } } iterator.advance(); @@ -309,13 +354,100 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi _asynchronousInhaler.set(null); } + if (_log.isInfoEnabled()) + { + _log.info("Inhaler Stopping:" + _queue.getName()); + showUsage("Inhaler Stopping:" + _queue.getName()); + } + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) { + if (_log.isInfoEnabled()) + { + _log.info("Rescheduling Inhaler:" + _queue.getName()); + } _inhaler.execute(messageInhaler); - } } + private class MessagePurger implements Runnable + { + public void run() + { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName()); + try + { + purgeList(this); + } + finally + { + Thread.currentThread().setName(threadName); + } + } + } + + private void purgeList(MessagePurger messagePurger) + { + // If in memory count is at or over max then we can't inhale + if (_atomicQueueInMemory.get() <= _memoryUsageMinimum) + { + if (_log.isDebugEnabled()) + { + _log.debug("Unable to start purging as we are already below our minimum cache level:" + + _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum); + } + return; + } + + _asynchronousPurger.compareAndSet(messagePurger, null); + + while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger)) + { + QueueEntryIterator iterator = iterator(); + + while (!iterator.getNode().isAvailable() && iterator.advance()) + { + //Find first AVAILABLE node + } + + // Count up the memory usage + long memoryUsage = 0; + while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail()) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + memoryUsage += entry.getSize(); + } + + iterator.advance(); + } + + //Purge remainging mesages on queue + while (!iterator.atTail()) + { + QueueEntry entry = iterator.getNode(); + + if (entry.isAvailable() && !entry.isFlowed()) + { + entry.unload(); + } + + iterator.advance(); + } + + _asynchronousInhaler.set(null); + } + + //If we have become flowed or have more capacity since we stopped then schedule the thread to run again. + if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum) + { + _inhaler.execute(messagePurger); + + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java deleted file mode 100644 index b547a41047..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.concurrent.atomic.AtomicLong; - -public interface FlowableQueueEntryList extends QueueEntryList -{ - void setFlowed(boolean flowed); - - boolean isFlowed(); - - int size(); - - long dataSize(); - - long memoryUsed(); - - void setMemoryUsageMaximum(long maximumMemoryUsage); - - long getMemoryUsageMaximum(); - - void setMemoryUsageMinimum(long minimumMemoryUsage); - - long getMemoryUsageMinimum(); - - /** - * Immediately unload Entry - * @param queueEntry the entry to unload - */ - public void unloadEntry(QueueEntry queueEntry); - - /** - * Immediately load Entry - * @param queueEntry the entry to load - */ - public void loadEntry(QueueEntry queueEntry); - - void stop(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index 23307d8acf..61a3c45c49 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.CommonContentHeaderProperties; public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList { private final AMQQueue _queue; - private final FlowableQueueEntryList[] _priorityLists; + private final QueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; @@ -33,13 +33,15 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement { super(queue); _queue = queue; - _priorityLists = new FlowableQueueEntryList[priorities]; + _priorityLists = new QueueEntryList[priorities]; _priorities = priorities; - _priorityOffset = 5-((priorities + 1)/2); - for(int i = 0; i < priorities; i++) + _priorityOffset = 5 - ((priorities + 1) / 2); + for (int i = 0; i < priorities; i++) { _priorityLists[i] = new SimpleQueueEntryList(queue); } + + showUsage("Created:" + _queue.getName()); } public int getPriorities() @@ -54,33 +56,149 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public QueueEntry add(AMQMessage message) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) + int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; + if (index >= _priorities) { - index = _priorities-1; + index = _priorities - 1; } - else if(index < 0) + else if (index < 0) { index = 0; } + + long requriedSize = message.getSize(); + // Check and see if list would flow on adding message + if (!_disabled && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum()) + { + if (_log.isDebugEnabled()) + { + _log.debug("Message(" + message.debugIdentity() + ") Add of size (" + + requriedSize + ") will cause flow. Searching for space"); + } + + long reclaimed = 0; + + //work down the priorities looking for memory + int scavangeIndex = _priorities - 1; + + //First: Don't take all the memory. So look for a queue that has more than 50% free + + long currentMax; + + while (scavangeIndex >= 0 && reclaimed <= requriedSize) + { + currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); + long used = _priorityLists[scavangeIndex].memoryUsed(); + + if (used < currentMax / 2) + { + long newMax = currentMax / 2; + + _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); + + reclaimed += currentMax - newMax; + if (_log.isDebugEnabled()) + { + _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); + } + break; + } + else + { + scavangeIndex--; + } + } + + //Second: Just take the memory we need + if (scavangeIndex == -1) + { + scavangeIndex = _priorities - 1; + while (scavangeIndex >= 0 && reclaimed <= requriedSize) + { + currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum(); + long used = _priorityLists[scavangeIndex].memoryUsed(); + + if (used < currentMax) + { + long newMax = currentMax - used; + + // if there are no messages at this priority just take it all + if (newMax == currentMax) + { + newMax = 0; + } + + _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax); + + reclaimed += currentMax - newMax; + if (_log.isDebugEnabled()) + { + _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex); + } + break; + } + else + { + scavangeIndex--; + } + } + } + + //Increment Maximum + if (reclaimed > 0) + { + if (_log.isDebugEnabled()) + { + _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed + + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed)); + } + _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed); + } + else + { + _log.debug("No space found."); + } + + showUsage(); + } + return _priorityLists[index].add(message); } + @Override + protected void showUsage(String prefix) + { + if (_log.isDebugEnabled()) + { + _log.debug(prefix); + for (int index = 0; index < _priorities; index++) + { + QueueEntryList queueEntryList = _priorityLists[index]; + _log.debug("Queue (" + _queue + ":" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed() + + "/" + queueEntryList.getMemoryUsageMaximum() + + "/" + queueEntryList.dataSize()); + } + } + } + public QueueEntry next(QueueEntry node) { - QueueEntryImpl nodeImpl = (QueueEntryImpl)node; + QueueEntryImpl nodeImpl = (QueueEntryImpl) node; QueueEntry next = nodeImpl.getNext(); - if(next == null) + if (next == null) { QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList(); int index; - for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--); + for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--) + { + ; + } - while(next == null && index != 0) + while (next == null && index != 0) { index--; - next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext(); + next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext(); } } @@ -89,24 +207,23 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement private final class PriorityQueueEntryListIterator implements QueueEntryIterator { - private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ]; + private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length]; private QueueEntry _lastNode; PriorityQueueEntryListIterator() { - for(int i = 0; i < _priorityLists.length; i++) + for (int i = 0; i < _priorityLists.length; i++) { _iterators[i] = _priorityLists[i].iterator(); } _lastNode = _iterators[_iterators.length - 1].getNode(); } - public boolean atTail() { - for(int i = 0; i < _iterators.length; i++) + for (int i = 0; i < _iterators.length; i++) { - if(!_iterators[i].atTail()) + if (!_iterators[i].atTail()) { return false; } @@ -121,9 +238,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public boolean advance() { - for(int i = _iterators.length-1; i >= 0; i--) + for (int i = _iterators.length - 1; i >= 0; i--) { - if(_iterators[i].advance()) + if (_iterators[i].advance()) { _lastNode = _iterators[i].getNode(); return true; @@ -140,7 +257,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement public QueueEntry getHead() { - return _priorityLists[_priorities-1].getHead(); + return _priorityLists[_priorities - 1].getHead(); } static class Factory implements QueueEntryListFactory @@ -152,17 +269,31 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement _priorities = priorities; } - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) + public QueueEntryList createQueueEntryList(AMQQueue queue) { return new PriorityQueueEntryList(queue, _priorities); } } @Override + public boolean isFlowed() + { + boolean flowed = false; + boolean full = true; + showUsage(); + for (QueueEntryList queueEntryList : _priorityLists) + { + full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed(); + flowed = flowed || (queueEntryList.isFlowed()); + } + return flowed && full; + } + + @Override public int size() { - int size=0; - for (FlowableQueueEntryList queueEntryList : _priorityLists) + int size = 0; + for (QueueEntryList queueEntryList : _priorityLists) { size += queueEntryList.size(); } @@ -170,10 +301,96 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement return size; } + @Override + public long dataSize() + { + int dataSize = 0; + for (QueueEntryList queueEntryList : _priorityLists) + { + dataSize += queueEntryList.dataSize(); + } + + return dataSize; + } + + @Override + public long memoryUsed() + { + int memoryUsed = 0; + for (QueueEntryList queueEntryList : _priorityLists) + { + memoryUsed += queueEntryList.memoryUsed(); + } + + return memoryUsed; + } + + @Override + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + _memoryUsageMaximum = maximumMemoryUsage; + + if (maximumMemoryUsage >= 0) + { + _disabled = false; + } + + long share = maximumMemoryUsage / _priorities; + + //Apply a share of the maximum To each prioirty quue + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.setMemoryUsageMaximum(share); + } + + if (maximumMemoryUsage < 0) + { + if (_log.isInfoEnabled()) + { + _log.info("Disabling Flow to Disk for queue:" + _queue.getName()); + } + _disabled = true; + return; + } + + //ensure we use the full allocation of memory + long remainder = maximumMemoryUsage - (share * _priorities); + if (remainder > 0) + { + _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder); + } + } + + @Override + public long getMemoryUsageMaximum() + { + return _memoryUsageMaximum; + } + + @Override + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + _memoryUsageMinimum = minimumMemoryUsage; + + //Apply a share of the minimum To each prioirty quue + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities); + } + } @Override - protected void flowingToDisk(QueueEntryImpl queueEntry) + public long getMemoryUsageMinimum() { - //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists + return _memoryUsageMinimum; + } + + @Override + public void stop() + { + for (QueueEntryList queueEntryList : _priorityLists) + { + queueEntryList.stop(); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 7e41cf53a2..2cb1493a8f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -221,7 +221,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean removeStateChangeListener(StateChangeListener listener); - void unload() throws UnableToFlowMessageException; + void unload(); void load(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 8ee03d3d74..2a579b83c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -393,17 +393,34 @@ public class QueueEntryImpl implements QueueEntry return false; } - public void unload() throws UnableToFlowMessageException + public void unload() { if (_message != null && _backingStore != null) { - if(_log.isDebugEnabled()) + + try { - _log.debug("Unloading:" + debugIdentity()); + _backingStore.unload(_message); + + if(_log.isDebugEnabled()) + { + _log.debug("Unloaded:" + debugIdentity()); + } + + _message = null; + //Update the memoryState if this load call resulted in the message being purged from memory + if (!_flowed.getAndSet(true)) + { + _queueEntryList.unloadEntry(this); + } + + } catch (UnableToFlowMessageException utfme) { + // There is no recovery needed as the memory states remain unchanged. + if(_log.isDebugEnabled()) + { + _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage()); + } } - _backingStore.unload(_message); - _message = null; - _flowed.getAndSet(true); } } @@ -412,11 +429,17 @@ public class QueueEntryImpl implements QueueEntry if (_messageId != null && _backingStore != null) { _message = _backingStore.load(_messageId); + if(_log.isDebugEnabled()) { - _log.debug("Loading:" + debugIdentity()); + _log.debug("Loaded:" + debugIdentity()); + } + + //Update the memoryState if this load call resulted in the message comming in to memory + if (_flowed.getAndSet(false)) + { + _queueEntryList.loadEntry(this); } - _flowed.getAndSet(false); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 313e076f61..706f10dca1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -1,23 +1,23 @@ /* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; public interface QueueEntryList @@ -31,4 +31,36 @@ public interface QueueEntryList QueueEntryIterator iterator(); QueueEntry getHead(); + + void setFlowed(boolean flowed); + + boolean isFlowed(); + + int size(); + + long dataSize(); + + long memoryUsed(); + + void setMemoryUsageMaximum(long maximumMemoryUsage); + + long getMemoryUsageMaximum(); + + void setMemoryUsageMinimum(long minimumMemoryUsage); + + long getMemoryUsageMinimum(); + + /** + * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler. + * @param queueEntry the entry that has been unloaded + */ + void unloadEntry(QueueEntry queueEntry); + + /** + * Immediately update memory usage based on the load of this queueEntry + * @param queueEntry the entry that has been loaded + */ + void loadEntry(QueueEntry queueEntry); + + void stop(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java index b4a868cf3c..4dbce45f67 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java @@ -22,5 +22,5 @@ package org.apache.qpid.server.queue; interface QueueEntryListFactory { - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue); + public QueueEntryList createQueueEntryList(AMQQueue queue); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index fa67162228..9f1472439c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private volatile Subscription _exclusiveSubscriber; - protected final FlowableQueueEntryList _entries; + protected final QueueEntryList _entries; private final AMQQueueMBean _managedObject; private final Executor _asyncDelivery; @@ -468,8 +468,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.isFlowed()) { - _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity()); - _entries.loadEntry(entry); + entry.load(); } sub.send(entry); @@ -477,7 +476,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // We have delivered this message so we can unload it. if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer()) { - _entries.unloadEntry(entry); + entry.unload(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index c72353db6e..a10e332ef5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,9 +1,6 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /* * @@ -172,7 +169,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList static class Factory implements QueueEntryListFactory { - public FlowableQueueEntryList createQueueEntryList(AMQQueue queue) + public QueueEntryList createQueueEntryList(AMQQueue queue) { return new SimpleQueueEntryList(queue); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 4716f6691a..c50770d7ba 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } - public void unload() throws UnableToFlowMessageException + public void unload() { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index f73bafd3b4..623f57b224 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.txn.NonTransactionalContext; import java.util.ArrayList; public class AMQPriorityQueueTest extends SimpleAMQQueueTest { - private static final long MESSAGE_SIZE = 100L; + private static final int PRIORITIES = 3; @Override protected void setUp() throws Exception - { + { _arguments = new FieldTable(); - _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3); + _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); super.setUp(); } @@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); index++; } @@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest protected AMQMessage createMessage(byte i) throws AMQException { AMQMessage message = super.createMessage(); - - ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i); + + ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i); return message; } - @Override - public void testMessagesFlowToDisk() throws AMQException, InterruptedException + + public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException { - //Disable this test pending completion of QPID-1637 + int PRIORITIES = 1; + FieldTable arguments = new FieldTable(); + arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES); + + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + //Create a priorityQueue + _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = PRIORITIES * 2; + int MESSAGE_COUNT = (int) MEMORY_MAX * 2; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + + for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) + { + sendMessage(txnContext, (msgCount % 10)); + } + + //Check that we can hold 10 messages without flowing + assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum()); + assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send another and ensure we are flowed + sendMessage(txnContext); + + assertTrue("Queue is not flowed.", _queue.isFlowed()); + assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent()); + + + //send another 99 so there are 200msgs in total on the queue + for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++) + { + sendMessage(txnContext); + + long usage = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + usage, + usage <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + usage, usage > 0); + + } + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.registerSubscription(_subscription, false); + + int slept = 0; + while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10) + { + Thread.sleep(500); + slept++; + } + + //Ensure the messages are retreived + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + + //Check the queue is still within it's limits. + assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), + _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); + + for (int index = 0; index < MESSAGE_COUNT; index++) + { + // Ensure that we have received the messages and it wasn't flushed to disk before we received it. + AMQMessage message = _subscription.getMessages().get(index); + assertNotNull("Message:" + message.debugIdentity() + " was null.", message); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0e2b17914c..5d7fa21d56 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.queue; * */ -import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; @@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase protected FieldTable _arguments = null; MessagePublishInfo info = new MessagePublishInfoImpl(); - private static long MESSAGE_SIZE = 100; + protected static long MESSAGE_SIZE = 100; @Override protected void setUp() throws Exception @@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX * 2; //Set the Memory Usage to be very low - _queue.setMemoryUsageMaximum(MEMORY_MAX); + _queue.setMemoryUsageMaximum(MEMORY_MAX); for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++) { @@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase assertTrue("Queue has gone over quota:" + usage, usage <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + usage,usage > 0); + assertTrue("Queue has a negative quota:" + usage, usage > 0); } assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); @@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase } //Ensure the messages are retreived - assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); + assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size()); //Check the queue is still within it's limits. - assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(), - _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum()); + long current = _queue.getMemoryUsageCurrent(); + assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() , + current <= _queue.getMemoryUsageMaximum()); - assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0); + assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); for (int index = 0; index < MESSAGE_COUNT; index++) { @@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage message = _subscription.getMessages().get(index); assertNotNull("Message:" + message.debugIdentity() + " was null.", message); } + } + + public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException + { + // Create IncomingMessage and nondurable queue + NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); + + MESSAGE_SIZE = 1; + long MEMORY_MAX = 10; + int MESSAGE_COUNT = (int) MEMORY_MAX; + //Set the Memory Usage to be very low + _queue.setMemoryUsageMaximum(MEMORY_MAX); + for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++) + { + sendMessage(txnContext); + } + + //Check that we can hold all messages without flowing + assertEquals(MESSAGE_COUNT, _queue.getMessageCount()); + assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent()); + assertTrue("Queue is flowed.", !_queue.isFlowed()); + + // Send anothe and ensure we are flowed + sendMessage(txnContext); + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + _queue.setMemoryUsageMaximum(0L); + + //Give the purger time to work + Thread.sleep(200); + + assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); + assertEquals(0L , _queue.getMemoryUsageCurrent()); + assertTrue("Queue is not flowed.", _queue.isFlowed()); + + } + + protected void sendMessage(TransactionalContext txnContext) throws AMQException + { + sendMessage(txnContext, 5); } - private void sendMessage(TransactionalContext txnContext) throws AMQException + protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException { IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog); @@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase contentHeaderBody.bodySize = MESSAGE_SIZE; contentHeaderBody.properties = new BasicContentHeaderProperties(); ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority); msg.setContentHeaderBody(contentHeaderBody); long messageId = msg.getMessageId(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 0c33b406e6..40961a3d2e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase //This is +2 because: // 1 - asyncDelivery Thread - // 2 - queueInhalerThread - assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount()); + // 2 - queue InhalerThread + // 3 - queue PurgerThread + assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount()); queue.stop(); |
