summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java268
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java269
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java70
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java94
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java61
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
14 files changed, 673 insertions, 211 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 83fcfad1fd..30bf8aba5d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -114,7 +114,7 @@ public class QueueConfiguration
public long getMemoryUsageMaximum()
{
- return _config.getLong("maximumMemoryUsage", 0);
+ return _config.getLong("maximumMemoryUsage", -1);
}
public long getMemoryUsageMinimum()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index a4f80a44b4..599f0a8ca4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,25 +31,29 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/** This is an abstract base class to handle */
-public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
+public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
- private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
+ protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
- private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
+ protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
/** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
- private long _memoryUsageMaximum = 0;
+ protected long _memoryUsageMaximum = -1L;
/** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
- private long _memoryUsageMinimum = 0;
+ protected long _memoryUsageMinimum = 0;
private volatile AtomicBoolean _flowed;
private QueueBackingStore _backingStore;
protected AMQQueue _queue;
private Executor _inhaler;
+ private Executor _purger;
private AtomicBoolean _stopped;
private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
+ protected boolean _disabled;
+ private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
+ private static final int BATCH_INHALE_COUNT = 100;
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -64,6 +67,8 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_stopped = new AtomicBoolean(false);
_inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ _disabled = true;
}
public void setFlowed(boolean flowed)
@@ -71,10 +76,26 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
if (_flowed.get() != flowed)
{
_log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ showUsage();
_flowed.set(flowed);
}
}
+ protected void showUsage()
+ {
+ showUsage("");
+ }
+
+ protected void showUsage(String prefix)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+ + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
+ + "/" + dataSize());
+ }
+ }
+
public boolean isFlowed()
{
return _flowed.get();
@@ -99,13 +120,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
{
_memoryUsageMaximum = maximumMemoryUsage;
+ if (maximumMemoryUsage >= 0)
+ {
+ _disabled = false;
+ }
+
// Don't attempt to start the inhaler/purger unless we have a minimum value specified.
if (_memoryUsageMaximum > 0)
{
- if (_memoryUsageMinimum == 0)
- {
- setMemoryUsageMinimum(_memoryUsageMaximum / 2);
- }
+ setMemoryUsageMinimum(_memoryUsageMaximum / 2);
// if we have now have to much memory in use we need to purge.
if (_memoryUsageMaximum < _atomicQueueInMemory.get())
@@ -113,6 +136,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
startPurger();
}
}
+ else if (_memoryUsageMaximum == 0)
+ {
+ if (_atomicQueueInMemory.get() > 0)
+ {
+ startPurger();
+ }
+ }
+ else
+ {
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+ }
+ _disabled = true;
+ }
}
public long getMemoryUsageMaximum()
@@ -134,7 +172,9 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void checkAndStartLoader()
{
// If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ long inMemory = _atomicQueueInMemory.get();
+ // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run.
+ if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0)
{
startInhaler();
}
@@ -142,22 +182,22 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void startInhaler()
{
- if (_flowed.get())
- {
- MessageInhaler inhaler = new MessageInhaler();
+ MessageInhaler inhaler = new MessageInhaler();
- if (_asynchronousInhaler.compareAndSet(null, inhaler))
- {
- _inhaler.execute(inhaler);
- }
+ if (_asynchronousInhaler.compareAndSet(null, inhaler))
+ {
+ _inhaler.execute(inhaler);
}
}
private void startPurger()
{
- //TODO create purger, used when maxMemory is reduced creating over memory situation.
- _log.warn("Requested Purger Start.. purger TBC.");
- //_purger.execute(new MessagePurger(this));
+ MessagePurger purger = new MessagePurger();
+
+ if (_asynchronousPurger.compareAndSet(null, purger))
+ {
+ _purger.execute(purger);
+ }
}
public long getMemoryUsageMinimum()
@@ -165,26 +205,30 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
return _memoryUsageMinimum;
}
+ /**
+ * Only to be called by the QueueEntry
+ *
+ * @param queueEntry the entry to unload
+ */
public void unloadEntry(QueueEntry queueEntry)
{
- try
- {
- queueEntry.unload();
- _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
- checkAndStartLoader();
- }
- catch (UnableToFlowMessageException e)
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
{
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
}
+ checkAndStartLoader();
}
+ /**
+ * Only to be called from the QueueEntry
+ *
+ * @param queueEntry the entry to load
+ */
public void loadEntry(QueueEntry queueEntry)
{
- queueEntry.load();
- if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
{
- _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+ _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
}
}
@@ -196,44 +240,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// rather than actively shutdown our threads.
//Shutdown thread for inhaler.
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
}
- protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
- {
- return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
- }
-
protected void incrementCounters(final QueueEntryImpl queueEntry)
{
_atomicQueueCount.incrementAndGet();
_atomicQueueSize.addAndGet(queueEntry.getSize());
- if (!willCauseFlowToDisk(queueEntry))
- {
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- }
- else
- {
- setFlowed(true);
- flowingToDisk(queueEntry);
- }
- }
+ long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- /**
- * Called when we are now flowing to disk
- *
- * @param queueEntry the entry that is being flowed to disk
- */
- protected void flowingToDisk(QueueEntryImpl queueEntry)
- {
- try
+ if (!_disabled && inUseMemory > _memoryUsageMaximum)
{
+ setFlowed(true);
queueEntry.unload();
}
- catch (UnableToFlowMessageException e)
- {
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- }
}
protected void dequeued(QueueEntryImpl queueEntry)
@@ -242,7 +263,10 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_atomicQueueSize.addAndGet(-queueEntry.getSize());
if (!queueEntry.isFlowed())
{
- _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0 on dequeue.");
+ }
}
}
@@ -270,32 +294,53 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void inhaleList(MessageInhaler messageInhaler)
{
- _log.info("Inhaler Running");
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Inhaler Running:" + _queue.getName());
+ showUsage("Inhaler Running:" + _queue.getName());
+ }
// If in memory count is at or over max then we can't inhale
if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
{
- _log.debug("Unable to start inhaling as we are already over quota:" +
- _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unable to start inhaling as we are already over quota:" +
+ _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ }
return;
}
_asynchronousInhaler.compareAndSet(messageInhaler, null);
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+ int inhaled = 0;
+
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT)
+ && _asynchronousInhaler.compareAndSet(null, messageInhaler))
{
QueueEntryIterator iterator = iterator();
- while (!iterator.getNode().isAvailable() && iterator.advance())
+ // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum)
+ && !iterator.getNode().isAvailable() && iterator.advance())
{
//Find first AVAILABLE node
}
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail())
{
QueueEntry entry = iterator.getNode();
if (entry.isAvailable() && entry.isFlowed())
{
- loadEntry(entry);
+ if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
+ {
+ // We don't have space for this message so we need to stop inhaling.
+ inhaled = BATCH_INHALE_COUNT;
+ }
+ else
+ {
+ loadEntry(entry);
+ inhaled++;
+ }
}
iterator.advance();
@@ -309,13 +354,100 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_asynchronousInhaler.set(null);
}
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Inhaler Stopping:" + _queue.getName());
+ showUsage("Inhaler Stopping:" + _queue.getName());
+ }
+
//If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Rescheduling Inhaler:" + _queue.getName());
+ }
_inhaler.execute(messageInhaler);
-
}
}
+ private class MessagePurger implements Runnable
+ {
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+ try
+ {
+ purgeList(this);
+ }
+ finally
+ {
+ Thread.currentThread().setName(threadName);
+ }
+ }
+ }
+
+ private void purgeList(MessagePurger messagePurger)
+ {
+ // If in memory count is at or over max then we can't inhale
+ if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unable to start purging as we are already below our minimum cache level:" +
+ _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum);
+ }
+ return;
+ }
+
+ _asynchronousPurger.compareAndSet(messagePurger, null);
+
+ while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger))
+ {
+ QueueEntryIterator iterator = iterator();
+
+ while (!iterator.getNode().isAvailable() && iterator.advance())
+ {
+ //Find first AVAILABLE node
+ }
+
+ // Count up the memory usage
+ long memoryUsage = 0;
+ while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && !entry.isFlowed())
+ {
+ memoryUsage += entry.getSize();
+ }
+
+ iterator.advance();
+ }
+
+ //Purge remainging mesages on queue
+ while (!iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && !entry.isFlowed())
+ {
+ entry.unload();
+ }
+
+ iterator.advance();
+ }
+
+ _asynchronousInhaler.set(null);
+ }
+
+ //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+ if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ {
+ _inhaler.execute(messagePurger);
+
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
deleted file mode 100644
index b547a41047..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public interface FlowableQueueEntryList extends QueueEntryList
-{
- void setFlowed(boolean flowed);
-
- boolean isFlowed();
-
- int size();
-
- long dataSize();
-
- long memoryUsed();
-
- void setMemoryUsageMaximum(long maximumMemoryUsage);
-
- long getMemoryUsageMaximum();
-
- void setMemoryUsageMinimum(long minimumMemoryUsage);
-
- long getMemoryUsageMinimum();
-
- /**
- * Immediately unload Entry
- * @param queueEntry the entry to unload
- */
- public void unloadEntry(QueueEntry queueEntry);
-
- /**
- * Immediately load Entry
- * @param queueEntry the entry to load
- */
- public void loadEntry(QueueEntry queueEntry);
-
- void stop();
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index 23307d8acf..61a3c45c49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.CommonContentHeaderProperties;
public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
- private final FlowableQueueEntryList[] _priorityLists;
+ private final QueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -33,13 +33,15 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
{
super(queue);
_queue = queue;
- _priorityLists = new FlowableQueueEntryList[priorities];
+ _priorityLists = new QueueEntryList[priorities];
_priorities = priorities;
- _priorityOffset = 5-((priorities + 1)/2);
- for(int i = 0; i < priorities; i++)
+ _priorityOffset = 5 - ((priorities + 1) / 2);
+ for (int i = 0; i < priorities; i++)
{
_priorityLists[i] = new SimpleQueueEntryList(queue);
}
+
+ showUsage("Created:" + _queue.getName());
}
public int getPriorities()
@@ -54,33 +56,149 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public QueueEntry add(AMQMessage message)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
+ int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if (index >= _priorities)
{
- index = _priorities-1;
+ index = _priorities - 1;
}
- else if(index < 0)
+ else if (index < 0)
{
index = 0;
}
+
+ long requriedSize = message.getSize();
+ // Check and see if list would flow on adding message
+ if (!_disabled && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
+ + requriedSize + ") will cause flow. Searching for space");
+ }
+
+ long reclaimed = 0;
+
+ //work down the priorities looking for memory
+ int scavangeIndex = _priorities - 1;
+
+ //First: Don't take all the memory. So look for a queue that has more than 50% free
+
+ long currentMax;
+
+ while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ {
+ currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+ long used = _priorityLists[scavangeIndex].memoryUsed();
+
+ if (used < currentMax / 2)
+ {
+ long newMax = currentMax / 2;
+
+ _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+ reclaimed += currentMax - newMax;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+ }
+ break;
+ }
+ else
+ {
+ scavangeIndex--;
+ }
+ }
+
+ //Second: Just take the memory we need
+ if (scavangeIndex == -1)
+ {
+ scavangeIndex = _priorities - 1;
+ while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ {
+ currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+ long used = _priorityLists[scavangeIndex].memoryUsed();
+
+ if (used < currentMax)
+ {
+ long newMax = currentMax - used;
+
+ // if there are no messages at this priority just take it all
+ if (newMax == currentMax)
+ {
+ newMax = 0;
+ }
+
+ _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+ reclaimed += currentMax - newMax;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+ }
+ break;
+ }
+ else
+ {
+ scavangeIndex--;
+ }
+ }
+ }
+
+ //Increment Maximum
+ if (reclaimed > 0)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
+ + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
+ }
+ _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
+ }
+ else
+ {
+ _log.debug("No space found.");
+ }
+
+ showUsage();
+ }
+
return _priorityLists[index].add(message);
}
+ @Override
+ protected void showUsage(String prefix)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(prefix);
+ for (int index = 0; index < _priorities; index++)
+ {
+ QueueEntryList queueEntryList = _priorityLists[index];
+ _log.debug("Queue (" + _queue + ":" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
+ + "/" + queueEntryList.getMemoryUsageMaximum()
+ + "/" + queueEntryList.dataSize());
+ }
+ }
+ }
+
public QueueEntry next(QueueEntry node)
{
- QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
+ QueueEntryImpl nodeImpl = (QueueEntryImpl) node;
QueueEntry next = nodeImpl.getNext();
- if(next == null)
+ if (next == null)
{
QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
int index;
- for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+ for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--)
+ {
+ ;
+ }
- while(next == null && index != 0)
+ while (next == null && index != 0)
{
index--;
- next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+ next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext();
}
}
@@ -89,24 +207,23 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
private final class PriorityQueueEntryListIterator implements QueueEntryIterator
{
- private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length];
private QueueEntry _lastNode;
PriorityQueueEntryListIterator()
{
- for(int i = 0; i < _priorityLists.length; i++)
+ for (int i = 0; i < _priorityLists.length; i++)
{
_iterators[i] = _priorityLists[i].iterator();
}
_lastNode = _iterators[_iterators.length - 1].getNode();
}
-
public boolean atTail()
{
- for(int i = 0; i < _iterators.length; i++)
+ for (int i = 0; i < _iterators.length; i++)
{
- if(!_iterators[i].atTail())
+ if (!_iterators[i].atTail())
{
return false;
}
@@ -121,9 +238,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public boolean advance()
{
- for(int i = _iterators.length-1; i >= 0; i--)
+ for (int i = _iterators.length - 1; i >= 0; i--)
{
- if(_iterators[i].advance())
+ if (_iterators[i].advance())
{
_lastNode = _iterators[i].getNode();
return true;
@@ -140,7 +257,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public QueueEntry getHead()
{
- return _priorityLists[_priorities-1].getHead();
+ return _priorityLists[_priorities - 1].getHead();
}
static class Factory implements QueueEntryListFactory
@@ -152,17 +269,31 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
_priorities = priorities;
}
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueEntryList(queue, _priorities);
}
}
@Override
+ public boolean isFlowed()
+ {
+ boolean flowed = false;
+ boolean full = true;
+ showUsage();
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+ flowed = flowed || (queueEntryList.isFlowed());
+ }
+ return flowed && full;
+ }
+
+ @Override
public int size()
{
- int size=0;
- for (FlowableQueueEntryList queueEntryList : _priorityLists)
+ int size = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
{
size += queueEntryList.size();
}
@@ -170,10 +301,96 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
return size;
}
+ @Override
+ public long dataSize()
+ {
+ int dataSize = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ dataSize += queueEntryList.dataSize();
+ }
+
+ return dataSize;
+ }
+
+ @Override
+ public long memoryUsed()
+ {
+ int memoryUsed = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ memoryUsed += queueEntryList.memoryUsed();
+ }
+
+ return memoryUsed;
+ }
+
+ @Override
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ _memoryUsageMaximum = maximumMemoryUsage;
+
+ if (maximumMemoryUsage >= 0)
+ {
+ _disabled = false;
+ }
+
+ long share = maximumMemoryUsage / _priorities;
+
+ //Apply a share of the maximum To each prioirty quue
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.setMemoryUsageMaximum(share);
+ }
+
+ if (maximumMemoryUsage < 0)
+ {
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+ }
+ _disabled = true;
+ return;
+ }
+
+ //ensure we use the full allocation of memory
+ long remainder = maximumMemoryUsage - (share * _priorities);
+ if (remainder > 0)
+ {
+ _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
+ }
+ }
+
+ @Override
+ public long getMemoryUsageMaximum()
+ {
+ return _memoryUsageMaximum;
+ }
+
+ @Override
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ _memoryUsageMinimum = minimumMemoryUsage;
+
+ //Apply a share of the minimum To each prioirty quue
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
+ }
+ }
@Override
- protected void flowingToDisk(QueueEntryImpl queueEntry)
+ public long getMemoryUsageMinimum()
{
- //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
+ return _memoryUsageMinimum;
+ }
+
+ @Override
+ public void stop()
+ {
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.stop();
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 7e41cf53a2..2cb1493a8f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -221,7 +221,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean removeStateChangeListener(StateChangeListener listener);
- void unload() throws UnableToFlowMessageException;
+ void unload();
void load();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 8ee03d3d74..2a579b83c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -393,17 +393,34 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
- public void unload() throws UnableToFlowMessageException
+ public void unload()
{
if (_message != null && _backingStore != null)
{
- if(_log.isDebugEnabled())
+
+ try
{
- _log.debug("Unloading:" + debugIdentity());
+ _backingStore.unload(_message);
+
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Unloaded:" + debugIdentity());
+ }
+
+ _message = null;
+ //Update the memoryState if this load call resulted in the message being purged from memory
+ if (!_flowed.getAndSet(true))
+ {
+ _queueEntryList.unloadEntry(this);
+ }
+
+ } catch (UnableToFlowMessageException utfme) {
+ // There is no recovery needed as the memory states remain unchanged.
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
+ }
}
- _backingStore.unload(_message);
- _message = null;
- _flowed.getAndSet(true);
}
}
@@ -412,11 +429,17 @@ public class QueueEntryImpl implements QueueEntry
if (_messageId != null && _backingStore != null)
{
_message = _backingStore.load(_messageId);
+
if(_log.isDebugEnabled())
{
- _log.debug("Loading:" + debugIdentity());
+ _log.debug("Loaded:" + debugIdentity());
+ }
+
+ //Update the memoryState if this load call resulted in the message comming in to memory
+ if (_flowed.getAndSet(false))
+ {
+ _queueEntryList.loadEntry(this);
}
- _flowed.getAndSet(false);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 313e076f61..706f10dca1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -1,23 +1,23 @@
/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
public interface QueueEntryList
@@ -31,4 +31,36 @@ public interface QueueEntryList
QueueEntryIterator iterator();
QueueEntry getHead();
+
+ void setFlowed(boolean flowed);
+
+ boolean isFlowed();
+
+ int size();
+
+ long dataSize();
+
+ long memoryUsed();
+
+ void setMemoryUsageMaximum(long maximumMemoryUsage);
+
+ long getMemoryUsageMaximum();
+
+ void setMemoryUsageMinimum(long minimumMemoryUsage);
+
+ long getMemoryUsageMinimum();
+
+ /**
+ * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
+ * @param queueEntry the entry that has been unloaded
+ */
+ void unloadEntry(QueueEntry queueEntry);
+
+ /**
+ * Immediately update memory usage based on the load of this queueEntry
+ * @param queueEntry the entry that has been loaded
+ */
+ void loadEntry(QueueEntry queueEntry);
+
+ void stop();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
index b4a868cf3c..4dbce45f67 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
@@ -22,5 +22,5 @@ package org.apache.qpid.server.queue;
interface QueueEntryListFactory
{
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
+ public QueueEntryList createQueueEntryList(AMQQueue queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index fa67162228..9f1472439c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private volatile Subscription _exclusiveSubscriber;
- protected final FlowableQueueEntryList _entries;
+ protected final QueueEntryList _entries;
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
@@ -468,8 +468,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (entry.isFlowed())
{
- _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
- _entries.loadEntry(entry);
+ entry.load();
}
sub.send(entry);
@@ -477,7 +476,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// We have delivered this message so we can unload it.
if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
{
- _entries.unloadEntry(entry);
+ entry.unload();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index c72353db6e..a10e332ef5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,9 +1,6 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -172,7 +169,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
static class Factory implements QueueEntryListFactory
{
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 4716f6691a..c50770d7ba 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void unload() throws UnableToFlowMessageException
+ public void unload()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index f73bafd3b4..623f57b224 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
- private static final long MESSAGE_SIZE = 100L;
+ private static final int PRIORITIES = 3;
@Override
protected void setUp() throws Exception
- {
+ {
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
super.setUp();
}
@@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
index++;
}
@@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(byte i) throws AMQException
{
AMQMessage message = super.createMessage();
-
- ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
+
+ ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i);
return message;
}
- @Override
- public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+
+ public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException
{
- //Disable this test pending completion of QPID-1637
+ int PRIORITIES = 1;
+ FieldTable arguments = new FieldTable();
+ arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
+
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Create a priorityQueue
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = PRIORITIES * 2;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext, (msgCount % 10));
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum());
+ assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send another and ensure we are flowed
+ sendMessage(txnContext);
+
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+
+
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ {
+ sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0e2b17914c..5d7fa21d56 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.qpid.server.queue;
*
*/
-import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static long MESSAGE_SIZE = 100;
+ protected static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase
long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
@@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase
assertTrue("Queue has gone over quota:" + usage,
usage <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + usage,usage > 0);
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
}
assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
@@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase
}
//Ensure the messages are retreived
- assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
//Check the queue is still within it's limits.
- assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
- _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+ long current = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+ current <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
for (int index = 0; index < MESSAGE_COUNT; index++)
{
@@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage message = _subscription.getMessages().get(index);
assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+ }
+
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 10;
+ int MESSAGE_COUNT = (int) MEMORY_MAX;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold all messages without flowing
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.setMemoryUsageMaximum(0L);
+
+ //Give the purger time to work
+ Thread.sleep(200);
+
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(0L , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ }
+
+ protected void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ sendMessage(txnContext, 5);
}
- private void sendMessage(TransactionalContext txnContext) throws AMQException
+ protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
@@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
msg.setContentHeaderBody(contentHeaderBody);
long messageId = msg.getMessageId();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 0c33b406e6..40961a3d2e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
//This is +2 because:
// 1 - asyncDelivery Thread
- // 2 - queueInhalerThread
- assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ // 2 - queue InhalerThread
+ // 3 - queue PurgerThread
+ assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();