diff options
| author | Keith Wall <kwall@apache.org> | 2012-05-09 12:17:57 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-05-09 12:17:57 +0000 |
| commit | eac63c52d9b8cd6f722bade2ab92861bfdd5f30d (patch) | |
| tree | a0d75f5468d99399181fd09d1545a22774845345 /qpid/java/broker/src/main | |
| parent | f982f86ceede56c2fa153e1cef21a31a75b5a669 (diff) | |
| download | qpid-python-eac63c52d9b8cd6f722bade2ab92861bfdd5f30d.tar.gz | |
QPID-3979: [Java Broker] Last value queue memory leak
The failure to remove entries from the _latestValuesMap caused leak of ConflationQueueList entries.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 85 |
1 files changed, 64 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 75e6f2cfdc..d8467d2d8e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -25,6 +25,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -52,29 +54,29 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueEntry(this, message); } - @Override public ConflationQueueEntry add(final ServerMessage message) { - ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); - AtomicReference<QueueEntry> latestValueReference = null; + final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); - Object value = message.getMessageHeader().getHeader(_conflationKey); - if(value != null) + final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); + if (keyValue != null) { - latestValueReference = _latestValuesMap.get(value); - if(latestValueReference == null) - { - _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry)); - latestValueReference = _latestValuesMap.get(value); - } + final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry); + AtomicReference<QueueEntry> latestValueReference = null; QueueEntry oldEntry; + // Iterate until we have got a valid atomic reference object and either the referent is newer than the current + // entry, or the current entry has replaced it in the reference. Note that the head represents a special value + // indicating that the reference object is no longer valid (it is being removed from the map). do { + latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry); oldEntry = latestValueReference.get(); } - while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry)); + while(oldEntry.compareTo(entry) < 0 + && oldEntry != getHead() + && !latestValueReference.compareAndSet(oldEntry, entry)); if(oldEntry.compareTo(entry) < 0) { @@ -85,14 +87,24 @@ public class ConflationQueueList extends SimpleQueueEntryList { // A newer entry came along discardEntry(entry); - } + + entry.setLatestValueReference(latestValueReference); } - entry.setLatestValueReference(latestValueReference); return entry; } + private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue) + { + AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue); + if(latestValueReference == null) + { + latestValueReference = _latestValuesMap.get(key); + } + return latestValueReference; + } + private void discardEntry(final QueueEntry entry) { if(entry.acquire()) @@ -101,11 +113,13 @@ public class ConflationQueueList extends SimpleQueueEntryList txn.dequeue(entry.getQueue(),entry.getMessage(), new ServerTransaction.Action() { + @Override public void postCommit() { entry.discard(); } + @Override public void onRollback() { @@ -117,7 +131,6 @@ public class ConflationQueueList extends SimpleQueueEntryList private final class ConflationQueueEntry extends SimpleQueueEntryImpl { - private AtomicReference<QueueEntry> _latestValueReference; public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message) @@ -125,25 +138,56 @@ public class ConflationQueueList extends SimpleQueueEntryList super(queueEntryList, message); } - + @Override public void release() { super.release(); - if(_latestValueReference != null) + discardIfReleasedEntryIsNoLongerLatest(); + } + + @Override + public boolean delete() + { + if(super.delete()) { - if(_latestValueReference.get() != this) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead())) { - discardEntry(this); + Object key = getMessageHeader().getHeader(_conflationKey); + _latestValuesMap.remove(key,_latestValueReference); } + return true; + } + else + { + return false; } - } public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference) { _latestValueReference = latestValueReference; } + + private void discardIfReleasedEntryIsNoLongerLatest() + { + if(_latestValueReference != null) + { + if(_latestValueReference.get() != this) + { + discardEntry(this); + } + } + } + + } + + /** + * Exposed purposes of unit test only. + */ + Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap() + { + return Collections.unmodifiableMap(_latestValuesMap); } static class Factory implements QueueEntryListFactory @@ -160,5 +204,4 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueList(queue, _conflationKey); } } - } |
