From 3c2f41d29f26526565d302d5d1c01584eb0c0ab2 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 5 Nov 2014 13:32:54 +0000 Subject: QPID-6214: [Java Broker] Change asynch recoverer to allow for task cancellation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636871 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/virtualhost/AbstractVirtualHost.java | 14 +++-- .../AsynchronousMessageStoreRecoverer.java | 71 ++++++++++++++++++---- .../server/virtualhost/MessageStoreRecoverer.java | 6 ++ .../SynchronousMessageStoreRecoverer.java | 6 ++ 4 files changed, 80 insertions(+), 17 deletions(-) (limited to 'qpid/java/broker-core/src') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index a0a89d9f12..52d779f877 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -161,7 +161,7 @@ public abstract class AbstractVirtualHost> exte private MessageDestination _defaultDestination; private MessageStore _messageStore; - + private MessageStoreRecoverer _messageStoreRecoverer; public AbstractVirtualHost(final Map attributes, VirtualHostNode virtualHostNode) { @@ -688,6 +688,11 @@ public abstract class AbstractVirtualHost> exte { try { + if (_messageStoreRecoverer != null) + { + _messageStoreRecoverer.cancel(); + } + getMessageStore().closeMessageStore(); } catch (StoreException e) @@ -1411,16 +1416,15 @@ public abstract class AbstractVirtualHost> exte createDefaultExchanges(); } - MessageStoreRecoverer messageStoreRecoverer; if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { - messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); } else { - messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); } - messageStoreRecoverer.recover(this); + _messageStoreRecoverer.recover(this); State finalState = State.ERRORED; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 899cfdcd6e..628b0ea08c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -27,6 +27,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -56,17 +59,28 @@ import org.apache.qpid.transport.util.Functions; public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class); + private AsynchronousRecoverer _asynchronousRecoverer; @Override public void recover(final VirtualHostImpl virtualHost) { - AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); + _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); - asynchronousRecoverer.recover(); + _asynchronousRecoverer.recover(); + } + + @Override + public void cancel() + { + if (_asynchronousRecoverer != null) + { + _asynchronousRecoverer.cancel(); + } } private static class AsynchronousRecoverer { + public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000; private final VirtualHostImpl _virtualHost; private final EventLogger _eventLogger; private final MessageStore _store; @@ -75,7 +89,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private final Set> _recoveringQueues = new CopyOnWriteArraySet<>(); private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); private final Map>> _recoveredMessages = new HashMap<>(); - + private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool(); + private AtomicBoolean _continueRecovery = new AtomicBoolean(true); private AsynchronousRecoverer(final VirtualHostImpl virtualHost) { @@ -95,8 +110,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer for(AMQQueue queue : _recoveringQueues) { - Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); - queueThread.start(); + _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue)); } } @@ -161,14 +175,18 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { messagesToDelete.add(storedMessage); } - return messageNumber <_maxMessageId-1; + return _continueRecovery.get() && messageNumber <_maxMessageId-1; } }); for(StoredMessage storedMessage : messagesToDelete) { - - _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); - storedMessage.remove(); + if (_continueRecovery.get()) + { + _logger.info("Message id " + + storedMessage.getMessageNumber() + + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } } messagesToDelete.clear(); @@ -198,6 +216,25 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer return ref == null ? null : ref.getMessage(); } + public void cancel() + { + _continueRecovery.set(false); + _queueRecoveryExecutor.shutdown(); + try + { + boolean wasShutdown = _queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!wasShutdown) + { + _logger.warn("Failed to gracefully shutdown queue recovery executor within permitted time period"); + _queueRecoveryExecutor.shutdownNow(); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + private class DistributedTransactionVisitor implements DistributedTransactionHandler { @@ -335,7 +372,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer branch.setState(DtxBranch.State.PREPARED); branch.prePrepareTransaction(); - return true; + return _continueRecovery.get(); } private StringBuilder xidAsString(Xid id) @@ -364,7 +401,17 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer @Override public void run() { - recoverQueue(_queue); + String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Queue Recoverer : " + _queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); + + try + { + recoverQueue(_queue); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } } } @@ -408,7 +455,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer txn.dequeueMessage(_queue, new DummyMessage(messageId)); txn.commitTranAsync(); } - return true; + return _continueRecovery.get(); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java index 9f4c7dd319..7500d4e6e0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java @@ -23,4 +23,10 @@ package org.apache.qpid.server.virtualhost; public interface MessageStoreRecoverer { void recover(VirtualHostImpl virtualHost); + + /** + * Cancels any in-progress message store recovery. If message store recovery has already + * completed, this method call has no effect. + */ + void cancel(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index becf4a073c..1cdafde945 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -113,6 +113,12 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } + @Override + public void cancel() + { + // No-op + } + private static class MessageVisitor implements MessageHandler { -- cgit v1.2.1