summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-03-02 15:13:57 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-03-02 15:13:57 +0000
commit36d2a03f183eecc1ccb7be9e89a56f4aec3dd945 (patch)
tree35d120911544f87808c8f1b960f7bad3ad6b8b02 /qpid
parentce17ef36e3517d0c5506233d45764818f1a2ad57 (diff)
downloadqpid-python-36d2a03f183eecc1ccb7be9e89a56f4aec3dd945.tar.gz
QPID-1637 : Added Purger thread for Priority Queues and when threasholds are adjusted.
QueueEntries are now the point of entry to load/unload rather than the List. This is because it is only the QueueEntryList that the QueueEntry that is attached to that can correctly account for the inMemory usage. In the Priority Queue case the priority queue does not know which sub list the QueueEntry is on. As the QEI knows it makes sence to request load/unload through the entry. Set the default Maximum InMemory to -1, disabled. Removed the FlowableQueueEntryList interface, merged with QueueEntryList git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@749331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java268
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java58
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java269
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java70
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java94
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java61
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java5
14 files changed, 673 insertions, 211 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 83fcfad1fd..30bf8aba5d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -114,7 +114,7 @@ public class QueueConfiguration
public long getMemoryUsageMaximum()
{
- return _config.getLong("maximumMemoryUsage", 0);
+ return _config.getLong("maximumMemoryUsage", -1);
}
public long getMemoryUsageMinimum()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index a4f80a44b4..599f0a8ca4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,25 +31,29 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/** This is an abstract base class to handle */
-public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
+public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
- private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
+ protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
- private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
+ protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
/** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
- private long _memoryUsageMaximum = 0;
+ protected long _memoryUsageMaximum = -1L;
/** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
- private long _memoryUsageMinimum = 0;
+ protected long _memoryUsageMinimum = 0;
private volatile AtomicBoolean _flowed;
private QueueBackingStore _backingStore;
protected AMQQueue _queue;
private Executor _inhaler;
+ private Executor _purger;
private AtomicBoolean _stopped;
private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
+ protected boolean _disabled;
+ private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
+ private static final int BATCH_INHALE_COUNT = 100;
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -64,6 +67,8 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_stopped = new AtomicBoolean(false);
_inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ _disabled = true;
}
public void setFlowed(boolean flowed)
@@ -71,10 +76,26 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
if (_flowed.get() != flowed)
{
_log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ showUsage();
_flowed.set(flowed);
}
}
+ protected void showUsage()
+ {
+ showUsage("");
+ }
+
+ protected void showUsage(String prefix)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+ + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
+ + "/" + dataSize());
+ }
+ }
+
public boolean isFlowed()
{
return _flowed.get();
@@ -99,13 +120,15 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
{
_memoryUsageMaximum = maximumMemoryUsage;
+ if (maximumMemoryUsage >= 0)
+ {
+ _disabled = false;
+ }
+
// Don't attempt to start the inhaler/purger unless we have a minimum value specified.
if (_memoryUsageMaximum > 0)
{
- if (_memoryUsageMinimum == 0)
- {
- setMemoryUsageMinimum(_memoryUsageMaximum / 2);
- }
+ setMemoryUsageMinimum(_memoryUsageMaximum / 2);
// if we have now have to much memory in use we need to purge.
if (_memoryUsageMaximum < _atomicQueueInMemory.get())
@@ -113,6 +136,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
startPurger();
}
}
+ else if (_memoryUsageMaximum == 0)
+ {
+ if (_atomicQueueInMemory.get() > 0)
+ {
+ startPurger();
+ }
+ }
+ else
+ {
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+ }
+ _disabled = true;
+ }
}
public long getMemoryUsageMaximum()
@@ -134,7 +172,9 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void checkAndStartLoader()
{
// If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ long inMemory = _atomicQueueInMemory.get();
+ // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run.
+ if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0)
{
startInhaler();
}
@@ -142,22 +182,22 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void startInhaler()
{
- if (_flowed.get())
- {
- MessageInhaler inhaler = new MessageInhaler();
+ MessageInhaler inhaler = new MessageInhaler();
- if (_asynchronousInhaler.compareAndSet(null, inhaler))
- {
- _inhaler.execute(inhaler);
- }
+ if (_asynchronousInhaler.compareAndSet(null, inhaler))
+ {
+ _inhaler.execute(inhaler);
}
}
private void startPurger()
{
- //TODO create purger, used when maxMemory is reduced creating over memory situation.
- _log.warn("Requested Purger Start.. purger TBC.");
- //_purger.execute(new MessagePurger(this));
+ MessagePurger purger = new MessagePurger();
+
+ if (_asynchronousPurger.compareAndSet(null, purger))
+ {
+ _purger.execute(purger);
+ }
}
public long getMemoryUsageMinimum()
@@ -165,26 +205,30 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
return _memoryUsageMinimum;
}
+ /**
+ * Only to be called by the QueueEntry
+ *
+ * @param queueEntry the entry to unload
+ */
public void unloadEntry(QueueEntry queueEntry)
{
- try
- {
- queueEntry.unload();
- _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
- checkAndStartLoader();
- }
- catch (UnableToFlowMessageException e)
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
{
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
}
+ checkAndStartLoader();
}
+ /**
+ * Only to be called from the QueueEntry
+ *
+ * @param queueEntry the entry to load
+ */
public void loadEntry(QueueEntry queueEntry)
{
- queueEntry.load();
- if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
{
- _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+ _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
}
}
@@ -196,44 +240,21 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
// rather than actively shutdown our threads.
//Shutdown thread for inhaler.
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
}
- protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
- {
- return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
- }
-
protected void incrementCounters(final QueueEntryImpl queueEntry)
{
_atomicQueueCount.incrementAndGet();
_atomicQueueSize.addAndGet(queueEntry.getSize());
- if (!willCauseFlowToDisk(queueEntry))
- {
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- }
- else
- {
- setFlowed(true);
- flowingToDisk(queueEntry);
- }
- }
+ long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- /**
- * Called when we are now flowing to disk
- *
- * @param queueEntry the entry that is being flowed to disk
- */
- protected void flowingToDisk(QueueEntryImpl queueEntry)
- {
- try
+ if (!_disabled && inUseMemory > _memoryUsageMaximum)
{
+ setFlowed(true);
queueEntry.unload();
}
- catch (UnableToFlowMessageException e)
- {
- _atomicQueueInMemory.addAndGet(queueEntry.getSize());
- }
}
protected void dequeued(QueueEntryImpl queueEntry)
@@ -242,7 +263,10 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_atomicQueueSize.addAndGet(-queueEntry.getSize());
if (!queueEntry.isFlowed())
{
- _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0 on dequeue.");
+ }
}
}
@@ -270,32 +294,53 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
private void inhaleList(MessageInhaler messageInhaler)
{
- _log.info("Inhaler Running");
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Inhaler Running:" + _queue.getName());
+ showUsage("Inhaler Running:" + _queue.getName());
+ }
// If in memory count is at or over max then we can't inhale
if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
{
- _log.debug("Unable to start inhaling as we are already over quota:" +
- _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unable to start inhaling as we are already over quota:" +
+ _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+ }
return;
}
_asynchronousInhaler.compareAndSet(messageInhaler, null);
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+ int inhaled = 0;
+
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT)
+ && _asynchronousInhaler.compareAndSet(null, messageInhaler))
{
QueueEntryIterator iterator = iterator();
- while (!iterator.getNode().isAvailable() && iterator.advance())
+ // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum)
+ && !iterator.getNode().isAvailable() && iterator.advance())
{
//Find first AVAILABLE node
}
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail())
{
QueueEntry entry = iterator.getNode();
if (entry.isAvailable() && entry.isFlowed())
{
- loadEntry(entry);
+ if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
+ {
+ // We don't have space for this message so we need to stop inhaling.
+ inhaled = BATCH_INHALE_COUNT;
+ }
+ else
+ {
+ loadEntry(entry);
+ inhaled++;
+ }
}
iterator.advance();
@@ -309,13 +354,100 @@ public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryLi
_asynchronousInhaler.set(null);
}
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Inhaler Stopping:" + _queue.getName());
+ showUsage("Inhaler Stopping:" + _queue.getName());
+ }
+
//If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Rescheduling Inhaler:" + _queue.getName());
+ }
_inhaler.execute(messageInhaler);
-
}
}
+ private class MessagePurger implements Runnable
+ {
+ public void run()
+ {
+ String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+ try
+ {
+ purgeList(this);
+ }
+ finally
+ {
+ Thread.currentThread().setName(threadName);
+ }
+ }
+ }
+
+ private void purgeList(MessagePurger messagePurger)
+ {
+ // If in memory count is at or over max then we can't inhale
+ if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unable to start purging as we are already below our minimum cache level:" +
+ _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum);
+ }
+ return;
+ }
+
+ _asynchronousPurger.compareAndSet(messagePurger, null);
+
+ while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger))
+ {
+ QueueEntryIterator iterator = iterator();
+
+ while (!iterator.getNode().isAvailable() && iterator.advance())
+ {
+ //Find first AVAILABLE node
+ }
+
+ // Count up the memory usage
+ long memoryUsage = 0;
+ while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && !entry.isFlowed())
+ {
+ memoryUsage += entry.getSize();
+ }
+
+ iterator.advance();
+ }
+
+ //Purge remainging mesages on queue
+ while (!iterator.atTail())
+ {
+ QueueEntry entry = iterator.getNode();
+
+ if (entry.isAvailable() && !entry.isFlowed())
+ {
+ entry.unload();
+ }
+
+ iterator.advance();
+ }
+
+ _asynchronousInhaler.set(null);
+ }
+
+ //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+ if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ {
+ _inhaler.execute(messagePurger);
+
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
deleted file mode 100644
index b547a41047..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public interface FlowableQueueEntryList extends QueueEntryList
-{
- void setFlowed(boolean flowed);
-
- boolean isFlowed();
-
- int size();
-
- long dataSize();
-
- long memoryUsed();
-
- void setMemoryUsageMaximum(long maximumMemoryUsage);
-
- long getMemoryUsageMaximum();
-
- void setMemoryUsageMinimum(long minimumMemoryUsage);
-
- long getMemoryUsageMinimum();
-
- /**
- * Immediately unload Entry
- * @param queueEntry the entry to unload
- */
- public void unloadEntry(QueueEntry queueEntry);
-
- /**
- * Immediately load Entry
- * @param queueEntry the entry to load
- */
- public void loadEntry(QueueEntry queueEntry);
-
- void stop();
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index 23307d8acf..61a3c45c49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.CommonContentHeaderProperties;
public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
- private final FlowableQueueEntryList[] _priorityLists;
+ private final QueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -33,13 +33,15 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
{
super(queue);
_queue = queue;
- _priorityLists = new FlowableQueueEntryList[priorities];
+ _priorityLists = new QueueEntryList[priorities];
_priorities = priorities;
- _priorityOffset = 5-((priorities + 1)/2);
- for(int i = 0; i < priorities; i++)
+ _priorityOffset = 5 - ((priorities + 1) / 2);
+ for (int i = 0; i < priorities; i++)
{
_priorityLists[i] = new SimpleQueueEntryList(queue);
}
+
+ showUsage("Created:" + _queue.getName());
}
public int getPriorities()
@@ -54,33 +56,149 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public QueueEntry add(AMQMessage message)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
+ int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if (index >= _priorities)
{
- index = _priorities-1;
+ index = _priorities - 1;
}
- else if(index < 0)
+ else if (index < 0)
{
index = 0;
}
+
+ long requriedSize = message.getSize();
+ // Check and see if list would flow on adding message
+ if (!_disabled && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
+ + requriedSize + ") will cause flow. Searching for space");
+ }
+
+ long reclaimed = 0;
+
+ //work down the priorities looking for memory
+ int scavangeIndex = _priorities - 1;
+
+ //First: Don't take all the memory. So look for a queue that has more than 50% free
+
+ long currentMax;
+
+ while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ {
+ currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+ long used = _priorityLists[scavangeIndex].memoryUsed();
+
+ if (used < currentMax / 2)
+ {
+ long newMax = currentMax / 2;
+
+ _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+ reclaimed += currentMax - newMax;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+ }
+ break;
+ }
+ else
+ {
+ scavangeIndex--;
+ }
+ }
+
+ //Second: Just take the memory we need
+ if (scavangeIndex == -1)
+ {
+ scavangeIndex = _priorities - 1;
+ while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ {
+ currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+ long used = _priorityLists[scavangeIndex].memoryUsed();
+
+ if (used < currentMax)
+ {
+ long newMax = currentMax - used;
+
+ // if there are no messages at this priority just take it all
+ if (newMax == currentMax)
+ {
+ newMax = 0;
+ }
+
+ _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+ reclaimed += currentMax - newMax;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+ }
+ break;
+ }
+ else
+ {
+ scavangeIndex--;
+ }
+ }
+ }
+
+ //Increment Maximum
+ if (reclaimed > 0)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
+ + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
+ }
+ _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
+ }
+ else
+ {
+ _log.debug("No space found.");
+ }
+
+ showUsage();
+ }
+
return _priorityLists[index].add(message);
}
+ @Override
+ protected void showUsage(String prefix)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(prefix);
+ for (int index = 0; index < _priorities; index++)
+ {
+ QueueEntryList queueEntryList = _priorityLists[index];
+ _log.debug("Queue (" + _queue + ":" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
+ + "/" + queueEntryList.getMemoryUsageMaximum()
+ + "/" + queueEntryList.dataSize());
+ }
+ }
+ }
+
public QueueEntry next(QueueEntry node)
{
- QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
+ QueueEntryImpl nodeImpl = (QueueEntryImpl) node;
QueueEntry next = nodeImpl.getNext();
- if(next == null)
+ if (next == null)
{
QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
int index;
- for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+ for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--)
+ {
+ ;
+ }
- while(next == null && index != 0)
+ while (next == null && index != 0)
{
index--;
- next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+ next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext();
}
}
@@ -89,24 +207,23 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
private final class PriorityQueueEntryListIterator implements QueueEntryIterator
{
- private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length];
private QueueEntry _lastNode;
PriorityQueueEntryListIterator()
{
- for(int i = 0; i < _priorityLists.length; i++)
+ for (int i = 0; i < _priorityLists.length; i++)
{
_iterators[i] = _priorityLists[i].iterator();
}
_lastNode = _iterators[_iterators.length - 1].getNode();
}
-
public boolean atTail()
{
- for(int i = 0; i < _iterators.length; i++)
+ for (int i = 0; i < _iterators.length; i++)
{
- if(!_iterators[i].atTail())
+ if (!_iterators[i].atTail())
{
return false;
}
@@ -121,9 +238,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public boolean advance()
{
- for(int i = _iterators.length-1; i >= 0; i--)
+ for (int i = _iterators.length - 1; i >= 0; i--)
{
- if(_iterators[i].advance())
+ if (_iterators[i].advance())
{
_lastNode = _iterators[i].getNode();
return true;
@@ -140,7 +257,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
public QueueEntry getHead()
{
- return _priorityLists[_priorities-1].getHead();
+ return _priorityLists[_priorities - 1].getHead();
}
static class Factory implements QueueEntryListFactory
@@ -152,17 +269,31 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
_priorities = priorities;
}
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
{
return new PriorityQueueEntryList(queue, _priorities);
}
}
@Override
+ public boolean isFlowed()
+ {
+ boolean flowed = false;
+ boolean full = true;
+ showUsage();
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+ flowed = flowed || (queueEntryList.isFlowed());
+ }
+ return flowed && full;
+ }
+
+ @Override
public int size()
{
- int size=0;
- for (FlowableQueueEntryList queueEntryList : _priorityLists)
+ int size = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
{
size += queueEntryList.size();
}
@@ -170,10 +301,96 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
return size;
}
+ @Override
+ public long dataSize()
+ {
+ int dataSize = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ dataSize += queueEntryList.dataSize();
+ }
+
+ return dataSize;
+ }
+
+ @Override
+ public long memoryUsed()
+ {
+ int memoryUsed = 0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ memoryUsed += queueEntryList.memoryUsed();
+ }
+
+ return memoryUsed;
+ }
+
+ @Override
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ _memoryUsageMaximum = maximumMemoryUsage;
+
+ if (maximumMemoryUsage >= 0)
+ {
+ _disabled = false;
+ }
+
+ long share = maximumMemoryUsage / _priorities;
+
+ //Apply a share of the maximum To each prioirty quue
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.setMemoryUsageMaximum(share);
+ }
+
+ if (maximumMemoryUsage < 0)
+ {
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+ }
+ _disabled = true;
+ return;
+ }
+
+ //ensure we use the full allocation of memory
+ long remainder = maximumMemoryUsage - (share * _priorities);
+ if (remainder > 0)
+ {
+ _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
+ }
+ }
+
+ @Override
+ public long getMemoryUsageMaximum()
+ {
+ return _memoryUsageMaximum;
+ }
+
+ @Override
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ _memoryUsageMinimum = minimumMemoryUsage;
+
+ //Apply a share of the minimum To each prioirty quue
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
+ }
+ }
@Override
- protected void flowingToDisk(QueueEntryImpl queueEntry)
+ public long getMemoryUsageMinimum()
{
- //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
+ return _memoryUsageMinimum;
+ }
+
+ @Override
+ public void stop()
+ {
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ queueEntryList.stop();
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 7e41cf53a2..2cb1493a8f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -221,7 +221,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean removeStateChangeListener(StateChangeListener listener);
- void unload() throws UnableToFlowMessageException;
+ void unload();
void load();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 8ee03d3d74..2a579b83c6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -393,17 +393,34 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
- public void unload() throws UnableToFlowMessageException
+ public void unload()
{
if (_message != null && _backingStore != null)
{
- if(_log.isDebugEnabled())
+
+ try
{
- _log.debug("Unloading:" + debugIdentity());
+ _backingStore.unload(_message);
+
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Unloaded:" + debugIdentity());
+ }
+
+ _message = null;
+ //Update the memoryState if this load call resulted in the message being purged from memory
+ if (!_flowed.getAndSet(true))
+ {
+ _queueEntryList.unloadEntry(this);
+ }
+
+ } catch (UnableToFlowMessageException utfme) {
+ // There is no recovery needed as the memory states remain unchanged.
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
+ }
}
- _backingStore.unload(_message);
- _message = null;
- _flowed.getAndSet(true);
}
}
@@ -412,11 +429,17 @@ public class QueueEntryImpl implements QueueEntry
if (_messageId != null && _backingStore != null)
{
_message = _backingStore.load(_messageId);
+
if(_log.isDebugEnabled())
{
- _log.debug("Loading:" + debugIdentity());
+ _log.debug("Loaded:" + debugIdentity());
+ }
+
+ //Update the memoryState if this load call resulted in the message comming in to memory
+ if (_flowed.getAndSet(false))
+ {
+ _queueEntryList.loadEntry(this);
}
- _flowed.getAndSet(false);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 313e076f61..706f10dca1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -1,23 +1,23 @@
/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.queue;
public interface QueueEntryList
@@ -31,4 +31,36 @@ public interface QueueEntryList
QueueEntryIterator iterator();
QueueEntry getHead();
+
+ void setFlowed(boolean flowed);
+
+ boolean isFlowed();
+
+ int size();
+
+ long dataSize();
+
+ long memoryUsed();
+
+ void setMemoryUsageMaximum(long maximumMemoryUsage);
+
+ long getMemoryUsageMaximum();
+
+ void setMemoryUsageMinimum(long minimumMemoryUsage);
+
+ long getMemoryUsageMinimum();
+
+ /**
+ * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
+ * @param queueEntry the entry that has been unloaded
+ */
+ void unloadEntry(QueueEntry queueEntry);
+
+ /**
+ * Immediately update memory usage based on the load of this queueEntry
+ * @param queueEntry the entry that has been loaded
+ */
+ void loadEntry(QueueEntry queueEntry);
+
+ void stop();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
index b4a868cf3c..4dbce45f67 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
@@ -22,5 +22,5 @@ package org.apache.qpid.server.queue;
interface QueueEntryListFactory
{
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
+ public QueueEntryList createQueueEntryList(AMQQueue queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index fa67162228..9f1472439c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -79,7 +79,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private volatile Subscription _exclusiveSubscriber;
- protected final FlowableQueueEntryList _entries;
+ protected final QueueEntryList _entries;
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
@@ -468,8 +468,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (entry.isFlowed())
{
- _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
- _entries.loadEntry(entry);
+ entry.load();
}
sub.send(entry);
@@ -477,7 +476,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// We have delivered this message so we can unload it.
if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
{
- _entries.unloadEntry(entry);
+ entry.unload();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index c72353db6e..a10e332ef5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,9 +1,6 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -172,7 +169,7 @@ public class SimpleQueueEntryList extends FlowableBaseQueueEntryList
static class Factory implements QueueEntryListFactory
{
- public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
{
return new SimpleQueueEntryList(queue);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 4716f6691a..c50770d7ba 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -351,7 +351,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void unload() throws UnableToFlowMessageException
+ public void unload()
{
//To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index f73bafd3b4..623f57b224 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -24,18 +24,20 @@ import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import java.util.ArrayList;
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
- private static final long MESSAGE_SIZE = 100L;
+ private static final int PRIORITIES = 3;
@Override
protected void setUp() throws Exception
- {
+ {
_arguments = new FieldTable();
- _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+ _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
super.setUp();
}
@@ -84,7 +86,6 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
index++;
}
@@ -96,16 +97,91 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
protected AMQMessage createMessage(byte i) throws AMQException
{
AMQMessage message = super.createMessage();
-
- ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
+
+ ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i);
return message;
}
- @Override
- public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+
+ public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException
{
- //Disable this test pending completion of QPID-1637
+ int PRIORITIES = 1;
+ FieldTable arguments = new FieldTable();
+ arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
+
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Create a priorityQueue
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = PRIORITIES * 2;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext, (msgCount % 10));
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum());
+ assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send another and ensure we are flowed
+ sendMessage(txnContext);
+
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+
+
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ {
+ sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+ _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0e2b17914c..5d7fa21d56 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -20,7 +20,6 @@ package org.apache.qpid.server.queue;
*
*/
-import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
@@ -60,7 +59,7 @@ public class SimpleAMQQueueTest extends TestCase
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static long MESSAGE_SIZE = 100;
+ protected static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -368,7 +367,7 @@ public class SimpleAMQQueueTest extends TestCase
long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
//Set the Memory Usage to be very low
- _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
{
@@ -395,7 +394,7 @@ public class SimpleAMQQueueTest extends TestCase
assertTrue("Queue has gone over quota:" + usage,
usage <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + usage,usage > 0);
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
}
assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
@@ -412,13 +411,14 @@ public class SimpleAMQQueueTest extends TestCase
}
//Ensure the messages are retreived
- assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
//Check the queue is still within it's limits.
- assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
- _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+ long current = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+ current <= _queue.getMemoryUsageMaximum());
- assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
for (int index = 0; index < MESSAGE_COUNT; index++)
{
@@ -426,10 +426,52 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage message = _subscription.getMessages().get(index);
assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
}
+ }
+
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 10;
+ int MESSAGE_COUNT = (int) MEMORY_MAX;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold all messages without flowing
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.setMemoryUsageMaximum(0L);
+
+ //Give the purger time to work
+ Thread.sleep(200);
+
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(0L , _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ }
+
+ protected void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ sendMessage(txnContext, 5);
}
- private void sendMessage(TransactionalContext txnContext) throws AMQException
+ protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
@@ -438,6 +480,7 @@ public class SimpleAMQQueueTest extends TestCase
contentHeaderBody.bodySize = MESSAGE_SIZE;
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
msg.setContentHeaderBody(contentHeaderBody);
long messageId = msg.getMessageId();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 0c33b406e6..40961a3d2e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -48,8 +48,9 @@ public class SimpleAMQQueueThreadPoolTest extends TestCase
//This is +2 because:
// 1 - asyncDelivery Thread
- // 2 - queueInhalerThread
- assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+ // 2 - queue InhalerThread
+ // 3 - queue PurgerThread
+ assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount());
queue.stop();