From 6b02c4c08fdc26de6e048357111d9e0b85ab4927 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 25 Jul 2014 15:25:32 +0000 Subject: QPID-5907 : [Java Broker] Remove unreferenced messages from the store in asynchronous store recoverer process git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613449 13f79535-47bb-0310-9956-ffa450edef68 --- .../AsynchronousMessageStoreRecoverer.java | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 9742404225..750efc23ae 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; @@ -141,10 +142,26 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private synchronized void completeRecovery() { // at this point nothing should be writing to the map of recovered messages - for (MessageReference> entry : _recoveredMessages.values()) + for (Map.Entry>> entry : _recoveredMessages.entrySet()) { - entry.release(); + entry.getValue().release(); + entry.setValue(null); // free up any memory associated with the reference object } + getStore().visitMessages(new MessageHandler() + { + @Override + public boolean handle(final StoredMessage storedMessage) + { + + long messageNumber = storedMessage.getMessageNumber(); + if(!_recoveredMessages.containsKey(messageNumber)) + { + _logger.info("Message id " + messageNumber + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } + return messageNumber <_maxMessageId-1; + } + }); _recoveredMessages.clear(); } -- cgit v1.2.1