summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-03-26 16:57:21 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-03-26 16:57:21 +0000
commit4c37019df181f3e032c7615e24c43c9858dc71e4 (patch)
treedbf48163181735634df295f9dab5a3f711613a2a /java
parent91a7ec3f337d2fc8aacd672b0455e0b85a144309 (diff)
downloadqpid-python-4c37019df181f3e032c7615e24c43c9858dc71e4.tar.gz
QPID-1768 : Removed all the special priority queue code. Added the ability for a FlowableBaseQueueEntryList to delegate its accounting to a parent QueueEntryList. This results in the PriorityQueueEntryList using the same FtD algorithm as SimpleQELs.
- New Messages on a flowed queue are pushed optimistically pushed to disk, this should potentially be removed and just rely on the purger to flush the correct message which in the Priority case may not be the last message in. - When space is available messages are loaded in queue order, so in this case Priority order. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@758742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java91
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java292
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java7
6 files changed, 89 insertions, 335 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index 4b387eac53..ca7fc5abf6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -54,6 +54,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
protected boolean _disableFlowToDisk;
private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
private static final int BATCH_PROCESS_COUNT = 100;
+ protected FlowableBaseQueueEntryList _parentQueue;
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -89,7 +90,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
if (_log.isDebugEnabled())
{
- _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+ _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed()
+ "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
+ "/" + dataSize());
}
@@ -97,7 +98,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
public boolean isFlowed()
{
- return _flowed.get();
+ if (_parentQueue != null)
+ {
+ return _parentQueue.isFlowed();
+ }
+ else
+ {
+ return _flowed.get();
+ }
}
public int size()
@@ -204,12 +212,19 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
{
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ if (_parentQueue != null)
{
- _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+ _parentQueue.entryUnloadedUpdateMemory(queueEntry);
}
+ else
+ {
+ if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+ }
- checkAndStartInhaler();
+ checkAndStartInhaler();
+ }
}
/**
@@ -219,11 +234,18 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryLoadedUpdateMemory(QueueEntry queueEntry)
{
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ if (_parentQueue != null)
{
- _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
- setFlowed(true);
- startPurger();
+ _parentQueue.entryLoadedUpdateMemory(queueEntry);
+ }
+ else
+ {
+ if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ {
+ _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+ setFlowed(true);
+ startPurger();
+ }
}
}
@@ -241,28 +263,55 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
}
}
- protected void incrementCounters(final QueueEntryImpl queueEntry)
+ /**
+ * Mark this queue as part of another QueueEntryList for accounting purposes.
+ *
+ * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+ * a parent QueueEntrylist upon which the action should take place.
+ *
+ * @param queueEntryList The parent queue that is performing accounting.
+ */
+ public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList)
{
- _atomicQueueCount.incrementAndGet();
- _atomicQueueSize.addAndGet(queueEntry.getSize());
- long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ _parentQueue = queueEntryList;
+ }
- if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+ protected void incrementCounters(final QueueEntryImpl queueEntry)
+ {
+ if (_parentQueue != null)
+ {
+ _parentQueue.incrementCounters(queueEntry);
+ }
+ else
{
- setFlowed(true);
- queueEntry.unload();
+ _atomicQueueCount.incrementAndGet();
+ _atomicQueueSize.addAndGet(queueEntry.getSize());
+ long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+
+ if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+ {
+ setFlowed(true);
+ queueEntry.unload();
+ }
}
}
protected void dequeued(QueueEntryImpl queueEntry)
{
- _atomicQueueCount.decrementAndGet();
- _atomicQueueSize.addAndGet(-queueEntry.getSize());
- if (!queueEntry.isFlowed())
+ if (_parentQueue != null)
{
- if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ _parentQueue.dequeued(queueEntry);
+ }
+ else
+ {
+ _atomicQueueCount.decrementAndGet();
+ _atomicQueueSize.addAndGet(-queueEntry.getSize());
+ if (!queueEntry.isFlowed())
{
- _log.error("InMemory Count just went below 0 on dequeue.");
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0 on dequeue.");
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index fc11dd888a..83c7ebb4f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -39,6 +39,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
for (int i = 0; i < priorities; i++)
{
_priorityLists[i] = new SimpleQueueEntryList(queue);
+ _priorityLists[i].setParentQueueEntryList(this);
}
showUsage("Created:" + _queue.getName());
@@ -66,183 +67,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
index = 0;
}
- long requriedSize = message.getSize();
- // Check and see if list would flow on adding message
- if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
- + requriedSize + ") will cause flow. Searching for space");
- }
-
- long reclaimed = 0;
-
- //work down the priorities looking for memory
-
- //First: Don't take all the memory. So look for a queue that has more than 50% free
- long currentMax;
- int scavangeIndex = 0;
-
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
-
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
- long used = _priorityLists[scavangeIndex].memoryUsed();
-
- if (used < currentMax / 2)
- {
- long newMax = currentMax / 2;
-
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
-
- //Second: Just take the free memory we need
- if (scavangeIndex == _priorities)
- {
- scavangeIndex = 0;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
-
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
- long used = _priorityLists[scavangeIndex].memoryUsed();
-
- if (used < currentMax)
- {
- long newMax = currentMax - used;
-
- // if there are no messages at this priority just take it all
- if (newMax == currentMax)
- {
- newMax = 0;
- }
-
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
-
- //Third: Take required memory
- if (scavangeIndex == _priorities)
- {
- scavangeIndex = 0;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-
- if (currentMax > 0 )
- {
- long newMax = currentMax;
- // Just take the amount of space required for this message.
- if (newMax > requriedSize)
- {
- newMax = requriedSize;
- }
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
- }
- }
-
- //Increment Maximum
- if (reclaimed > 0)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
- + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
- }
- _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
- }
- else
- {
- _log.debug("No space found.");
- }
-
- if (_log.isTraceEnabled())
- {
- showUsage("Add");
- }
- }
-
return _priorityLists[index].add(message);
}
- @Override
- protected void showUsage(String prefix)
- {
- if (_log.isDebugEnabled())
- {
- if (prefix.length() != 0)
- {
- _log.debug(prefix);
- }
- for (int index = 0; index < _priorities; index++)
- {
- QueueEntryList queueEntryList = _priorityLists[index];
- _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
- + "/" + queueEntryList.getMemoryUsageMaximum()
- + "/" + queueEntryList.dataSize());
- }
- }
- }
public QueueEntry next(QueueEntry node)
{
@@ -338,122 +165,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
}
}
- @Override
- public boolean isFlowed()
- {
- boolean flowed = false;
- boolean full = true;
-
- if (_log.isTraceEnabled())
- {
- showUsage("isFlowed");
- }
-
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
- full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
- flowed = flowed || (queueEntryList.isFlowed());
- }
- return flowed && full;
- }
-
- @Override
- public int size()
- {
- int size = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- size += queueEntryList.size();
- }
-
- return size;
- }
-
- @Override
- public long dataSize()
- {
- int dataSize = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- dataSize += queueEntryList.dataSize();
- }
-
- return dataSize;
- }
-
- @Override
- public long memoryUsed()
- {
- int memoryUsed = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- memoryUsed += queueEntryList.memoryUsed();
- }
-
- return memoryUsed;
- }
-
- @Override
- public void setMemoryUsageMaximum(long maximumMemoryUsage)
- {
- _memoryUsageMaximum = maximumMemoryUsage;
-
- if (maximumMemoryUsage >= 0)
- {
- _disableFlowToDisk = false;
- }
-
- long share = maximumMemoryUsage / _priorities;
-
- //Apply a share of the maximum To each prioirty quue
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- queueEntryList.setMemoryUsageMaximum(share);
- }
-
- if (maximumMemoryUsage < 0)
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
- }
- _disableFlowToDisk = true;
- return;
- }
-
- //ensure we use the full allocation of memory
- long remainder = maximumMemoryUsage - (share * _priorities);
- if (remainder > 0)
- {
- _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
- }
- }
-
- @Override
- public long getMemoryUsageMaximum()
- {
- return _memoryUsageMaximum;
- }
-
- @Override
- public void setMemoryUsageMinimum(long minimumMemoryUsage)
- {
- _memoryUsageMinimum = minimumMemoryUsage;
-
- //Apply a share of the minimum To each prioirty quue
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
- }
- }
-
- @Override
- public long getMemoryUsageMinimum()
- {
- return _memoryUsageMinimum;
- }
-
+
@Override
public void stop()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index b6e6365189..e6223ef4ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -155,6 +155,13 @@ public class QueueEntryImpl implements QueueEntry
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ *
+ * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should
+ * only be called after the message has been sent.
+ */
public void setDeliveredToSubscription()
{
_flags |= DELIVERED_TO_CONSUMER;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index a58c6eaf7d..2bbdf610de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -63,4 +63,14 @@ public interface QueueEntryList
void entryLoadedUpdateMemory(QueueEntry queueEntry);
void stop();
+
+ /**
+ * Mark this queue as part of another QueueEntryList for accounting purposes.
+ *
+ * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+ * a parent QueueEntrylist upon which the action should take place.
+ *
+ * @param queueEntryList The parent queue that is performing accounting.
+ */
+ void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
index 0d6c5948b4..ee3e727b80 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
@@ -50,21 +50,4 @@ public class AMQQueueFactoryPriorityTest extends AMQQueueFactoryTest
fail(e.getMessage());
}
}
-
- @Override
- public void testQueueValuesAfterCreation()
- {
- try
- {
- AMQQueue queue = createQueue();
-
- assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
- //NOTE: Priority queue will show 0 as minimum as the minimum value is actually spread between its sub QELs
- assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0, queue.getMemoryUsageMinimum());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index 11049a7ae3..0f4230806f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -48,11 +48,4 @@ public class MockAMQMessage extends TransientAMQMessage
_contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
_contentBodies = new ArrayList<ContentChunk>();
}
-
-
- @Override
- public long getSize()
- {
- return 0l;
- }
}