summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-09 12:17:57 +0000
committerKeith Wall <kwall@apache.org>2012-05-09 12:17:57 +0000
commiteac63c52d9b8cd6f722bade2ab92861bfdd5f30d (patch)
treea0d75f5468d99399181fd09d1545a22774845345 /qpid/java/broker/src/main
parentf982f86ceede56c2fa153e1cef21a31a75b5a669 (diff)
downloadqpid-python-eac63c52d9b8cd6f722bade2ab92861bfdd5f30d.tar.gz
QPID-3979: [Java Broker] Last value queue memory leak
The failure to remove entries from the _latestValuesMap caused leak of ConflationQueueList entries. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java85
1 files changed, 64 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 75e6f2cfdc..d8467d2d8e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -25,6 +25,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -52,29 +54,29 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueEntry(this, message);
}
-
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- AtomicReference<QueueEntry> latestValueReference = null;
+ final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- Object value = message.getMessageHeader().getHeader(_conflationKey);
- if(value != null)
+ final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
+ if (keyValue != null)
{
- latestValueReference = _latestValuesMap.get(value);
- if(latestValueReference == null)
- {
- _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
- latestValueReference = _latestValuesMap.get(value);
- }
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry);
+ AtomicReference<QueueEntry> latestValueReference = null;
QueueEntry oldEntry;
+ // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
+ // entry, or the current entry has replaced it in the reference. Note that the head represents a special value
+ // indicating that the reference object is no longer valid (it is being removed from the map).
do
{
+ latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry);
oldEntry = latestValueReference.get();
}
- while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(oldEntry.compareTo(entry) < 0
+ && oldEntry != getHead()
+ && !latestValueReference.compareAndSet(oldEntry, entry));
if(oldEntry.compareTo(entry) < 0)
{
@@ -85,14 +87,24 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
// A newer entry came along
discardEntry(entry);
-
}
+
+ entry.setLatestValueReference(latestValueReference);
}
- entry.setLatestValueReference(latestValueReference);
return entry;
}
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue)
+ {
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue);
+ if(latestValueReference == null)
+ {
+ latestValueReference = _latestValuesMap.get(key);
+ }
+ return latestValueReference;
+ }
+
private void discardEntry(final QueueEntry entry)
{
if(entry.acquire())
@@ -101,11 +113,13 @@ public class ConflationQueueList extends SimpleQueueEntryList
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
entry.discard();
}
+ @Override
public void onRollback()
{
@@ -117,7 +131,6 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
-
private AtomicReference<QueueEntry> _latestValueReference;
public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
@@ -125,25 +138,56 @@ public class ConflationQueueList extends SimpleQueueEntryList
super(queueEntryList, message);
}
-
+ @Override
public void release()
{
super.release();
- if(_latestValueReference != null)
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
+
+ @Override
+ public boolean delete()
+ {
+ if(super.delete())
{
- if(_latestValueReference.get() != this)
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead()))
{
- discardEntry(this);
+ Object key = getMessageHeader().getHeader(_conflationKey);
+ _latestValuesMap.remove(key,_latestValueReference);
}
+ return true;
+ }
+ else
+ {
+ return false;
}
-
}
public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
+
+ private void discardIfReleasedEntryIsNoLongerLatest()
+ {
+ if(_latestValueReference != null)
+ {
+ if(_latestValueReference.get() != this)
+ {
+ discardEntry(this);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Exposed purposes of unit test only.
+ */
+ Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ {
+ return Collections.unmodifiableMap(_latestValuesMap);
}
static class Factory implements QueueEntryListFactory
@@ -160,5 +204,4 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueList(queue, _conflationKey);
}
}
-
}