summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java91
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java292
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java7
6 files changed, 89 insertions, 335 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index 4b387eac53..ca7fc5abf6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -54,6 +54,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
protected boolean _disableFlowToDisk;
private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
private static final int BATCH_PROCESS_COUNT = 100;
+ protected FlowableBaseQueueEntryList _parentQueue;
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -89,7 +90,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
if (_log.isDebugEnabled())
{
- _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+ _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed()
+ "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
+ "/" + dataSize());
}
@@ -97,7 +98,14 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
public boolean isFlowed()
{
- return _flowed.get();
+ if (_parentQueue != null)
+ {
+ return _parentQueue.isFlowed();
+ }
+ else
+ {
+ return _flowed.get();
+ }
}
public int size()
@@ -204,12 +212,19 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
{
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ if (_parentQueue != null)
{
- _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+ _parentQueue.entryUnloadedUpdateMemory(queueEntry);
}
+ else
+ {
+ if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+ }
- checkAndStartInhaler();
+ checkAndStartInhaler();
+ }
}
/**
@@ -219,11 +234,18 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryLoadedUpdateMemory(QueueEntry queueEntry)
{
- if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ if (_parentQueue != null)
{
- _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
- setFlowed(true);
- startPurger();
+ _parentQueue.entryLoadedUpdateMemory(queueEntry);
+ }
+ else
+ {
+ if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ {
+ _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+ setFlowed(true);
+ startPurger();
+ }
}
}
@@ -241,28 +263,55 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
}
}
- protected void incrementCounters(final QueueEntryImpl queueEntry)
+ /**
+ * Mark this queue as part of another QueueEntryList for accounting purposes.
+ *
+ * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+ * a parent QueueEntrylist upon which the action should take place.
+ *
+ * @param queueEntryList The parent queue that is performing accounting.
+ */
+ public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList)
{
- _atomicQueueCount.incrementAndGet();
- _atomicQueueSize.addAndGet(queueEntry.getSize());
- long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ _parentQueue = queueEntryList;
+ }
- if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+ protected void incrementCounters(final QueueEntryImpl queueEntry)
+ {
+ if (_parentQueue != null)
+ {
+ _parentQueue.incrementCounters(queueEntry);
+ }
+ else
{
- setFlowed(true);
- queueEntry.unload();
+ _atomicQueueCount.incrementAndGet();
+ _atomicQueueSize.addAndGet(queueEntry.getSize());
+ long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+
+ if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+ {
+ setFlowed(true);
+ queueEntry.unload();
+ }
}
}
protected void dequeued(QueueEntryImpl queueEntry)
{
- _atomicQueueCount.decrementAndGet();
- _atomicQueueSize.addAndGet(-queueEntry.getSize());
- if (!queueEntry.isFlowed())
+ if (_parentQueue != null)
{
- if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ _parentQueue.dequeued(queueEntry);
+ }
+ else
+ {
+ _atomicQueueCount.decrementAndGet();
+ _atomicQueueSize.addAndGet(-queueEntry.getSize());
+ if (!queueEntry.isFlowed())
{
- _log.error("InMemory Count just went below 0 on dequeue.");
+ if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ {
+ _log.error("InMemory Count just went below 0 on dequeue.");
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index fc11dd888a..83c7ebb4f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -39,6 +39,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
for (int i = 0; i < priorities; i++)
{
_priorityLists[i] = new SimpleQueueEntryList(queue);
+ _priorityLists[i].setParentQueueEntryList(this);
}
showUsage("Created:" + _queue.getName());
@@ -66,183 +67,9 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
index = 0;
}
- long requriedSize = message.getSize();
- // Check and see if list would flow on adding message
- if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
- + requriedSize + ") will cause flow. Searching for space");
- }
-
- long reclaimed = 0;
-
- //work down the priorities looking for memory
-
- //First: Don't take all the memory. So look for a queue that has more than 50% free
- long currentMax;
- int scavangeIndex = 0;
-
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
-
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
- long used = _priorityLists[scavangeIndex].memoryUsed();
-
- if (used < currentMax / 2)
- {
- long newMax = currentMax / 2;
-
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
-
- //Second: Just take the free memory we need
- if (scavangeIndex == _priorities)
- {
- scavangeIndex = 0;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
-
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
- long used = _priorityLists[scavangeIndex].memoryUsed();
-
- if (used < currentMax)
- {
- long newMax = currentMax - used;
-
- // if there are no messages at this priority just take it all
- if (newMax == currentMax)
- {
- newMax = 0;
- }
-
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
-
- //Third: Take required memory
- if (scavangeIndex == _priorities)
- {
- scavangeIndex = 0;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- while (scavangeIndex < _priorities && reclaimed <= requriedSize)
- {
- currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-
- if (currentMax > 0 )
- {
- long newMax = currentMax;
- // Just take the amount of space required for this message.
- if (newMax > requriedSize)
- {
- newMax = requriedSize;
- }
- _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
- reclaimed += currentMax - newMax;
- if (_log.isDebugEnabled())
- {
- _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
- }
- break;
- }
- else
- {
- scavangeIndex++;
- if (scavangeIndex == index)
- {
- scavangeIndex++;
- }
- }
- }
- }
- }
-
- //Increment Maximum
- if (reclaimed > 0)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
- + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
- }
- _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
- }
- else
- {
- _log.debug("No space found.");
- }
-
- if (_log.isTraceEnabled())
- {
- showUsage("Add");
- }
- }
-
return _priorityLists[index].add(message);
}
- @Override
- protected void showUsage(String prefix)
- {
- if (_log.isDebugEnabled())
- {
- if (prefix.length() != 0)
- {
- _log.debug(prefix);
- }
- for (int index = 0; index < _priorities; index++)
- {
- QueueEntryList queueEntryList = _priorityLists[index];
- _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
- + "/" + queueEntryList.getMemoryUsageMaximum()
- + "/" + queueEntryList.dataSize());
- }
- }
- }
public QueueEntry next(QueueEntry node)
{
@@ -338,122 +165,7 @@ public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implement
}
}
- @Override
- public boolean isFlowed()
- {
- boolean flowed = false;
- boolean full = true;
-
- if (_log.isTraceEnabled())
- {
- showUsage("isFlowed");
- }
-
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
- full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
- flowed = flowed || (queueEntryList.isFlowed());
- }
- return flowed && full;
- }
-
- @Override
- public int size()
- {
- int size = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- size += queueEntryList.size();
- }
-
- return size;
- }
-
- @Override
- public long dataSize()
- {
- int dataSize = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- dataSize += queueEntryList.dataSize();
- }
-
- return dataSize;
- }
-
- @Override
- public long memoryUsed()
- {
- int memoryUsed = 0;
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- memoryUsed += queueEntryList.memoryUsed();
- }
-
- return memoryUsed;
- }
-
- @Override
- public void setMemoryUsageMaximum(long maximumMemoryUsage)
- {
- _memoryUsageMaximum = maximumMemoryUsage;
-
- if (maximumMemoryUsage >= 0)
- {
- _disableFlowToDisk = false;
- }
-
- long share = maximumMemoryUsage / _priorities;
-
- //Apply a share of the maximum To each prioirty quue
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- queueEntryList.setMemoryUsageMaximum(share);
- }
-
- if (maximumMemoryUsage < 0)
- {
- if (_log.isInfoEnabled())
- {
- _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
- }
- _disableFlowToDisk = true;
- return;
- }
-
- //ensure we use the full allocation of memory
- long remainder = maximumMemoryUsage - (share * _priorities);
- if (remainder > 0)
- {
- _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
- }
- }
-
- @Override
- public long getMemoryUsageMaximum()
- {
- return _memoryUsageMaximum;
- }
-
- @Override
- public void setMemoryUsageMinimum(long minimumMemoryUsage)
- {
- _memoryUsageMinimum = minimumMemoryUsage;
-
- //Apply a share of the minimum To each prioirty quue
- for (QueueEntryList queueEntryList : _priorityLists)
- {
- queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
- }
- }
-
- @Override
- public long getMemoryUsageMinimum()
- {
- return _memoryUsageMinimum;
- }
-
+
@Override
public void stop()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index b6e6365189..e6223ef4ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -155,6 +155,13 @@ public class QueueEntryImpl implements QueueEntry
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ *
+ * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should
+ * only be called after the message has been sent.
+ */
public void setDeliveredToSubscription()
{
_flags |= DELIVERED_TO_CONSUMER;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index a58c6eaf7d..2bbdf610de 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -63,4 +63,14 @@ public interface QueueEntryList
void entryLoadedUpdateMemory(QueueEntry queueEntry);
void stop();
+
+ /**
+ * Mark this queue as part of another QueueEntryList for accounting purposes.
+ *
+ * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+ * a parent QueueEntrylist upon which the action should take place.
+ *
+ * @param queueEntryList The parent queue that is performing accounting.
+ */
+ void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
index 0d6c5948b4..ee3e727b80 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
@@ -50,21 +50,4 @@ public class AMQQueueFactoryPriorityTest extends AMQQueueFactoryTest
fail(e.getMessage());
}
}
-
- @Override
- public void testQueueValuesAfterCreation()
- {
- try
- {
- AMQQueue queue = createQueue();
-
- assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
- //NOTE: Priority queue will show 0 as minimum as the minimum value is actually spread between its sub QELs
- assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0, queue.getMemoryUsageMinimum());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index 11049a7ae3..0f4230806f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -48,11 +48,4 @@ public class MockAMQMessage extends TransientAMQMessage
_contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
_contentBodies = new ArrayList<ContentChunk>();
}
-
-
- @Override
- public long getSize()
- {
- return 0l;
- }
}