diff options
| author | Keith Wall <kwall@apache.org> | 2014-11-05 13:32:54 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-11-05 13:32:54 +0000 |
| commit | 3c2f41d29f26526565d302d5d1c01584eb0c0ab2 (patch) | |
| tree | af5f8a7515ff671a4da82beaf7e56f5fd26a0bf7 /qpid/java/broker-core | |
| parent | d942858418286c716b0dac33b888a3f4b994ea41 (diff) | |
| download | qpid-python-3c2f41d29f26526565d302d5d1c01584eb0c0ab2.tar.gz | |
QPID-6214: [Java Broker] Change asynch recoverer to allow for task cancellation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
4 files changed, 80 insertions, 17 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index a0a89d9f12..52d779f877 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -161,7 +161,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private MessageDestination _defaultDestination; private MessageStore _messageStore; - + private MessageStoreRecoverer _messageStoreRecoverer; public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { @@ -688,6 +688,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { try { + if (_messageStoreRecoverer != null) + { + _messageStoreRecoverer.cancel(); + } + getMessageStore().closeMessageStore(); } catch (StoreException e) @@ -1411,16 +1416,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte createDefaultExchanges(); } - MessageStoreRecoverer messageStoreRecoverer; if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { - messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); } else { - messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); } - messageStoreRecoverer.recover(this); + _messageStoreRecoverer.recover(this); State finalState = State.ERRORED; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index 899cfdcd6e..628b0ea08c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -27,6 +27,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -56,17 +59,28 @@ import org.apache.qpid.transport.util.Functions; public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class); + private AsynchronousRecoverer _asynchronousRecoverer; @Override public void recover(final VirtualHostImpl virtualHost) { - AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); + _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); - asynchronousRecoverer.recover(); + _asynchronousRecoverer.recover(); + } + + @Override + public void cancel() + { + if (_asynchronousRecoverer != null) + { + _asynchronousRecoverer.cancel(); + } } private static class AsynchronousRecoverer { + public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000; private final VirtualHostImpl<?, ?, ?> _virtualHost; private final EventLogger _eventLogger; private final MessageStore _store; @@ -75,7 +89,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>(); private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>(); - + private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool(); + private AtomicBoolean _continueRecovery = new AtomicBoolean(true); private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost) { @@ -95,8 +110,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer for(AMQQueue<?> queue : _recoveringQueues) { - Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); - queueThread.start(); + _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue)); } } @@ -161,14 +175,18 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { messagesToDelete.add(storedMessage); } - return messageNumber <_maxMessageId-1; + return _continueRecovery.get() && messageNumber <_maxMessageId-1; } }); for(StoredMessage<?> storedMessage : messagesToDelete) { - - _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); - storedMessage.remove(); + if (_continueRecovery.get()) + { + _logger.info("Message id " + + storedMessage.getMessageNumber() + + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } } messagesToDelete.clear(); @@ -198,6 +216,25 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer return ref == null ? null : ref.getMessage(); } + public void cancel() + { + _continueRecovery.set(false); + _queueRecoveryExecutor.shutdown(); + try + { + boolean wasShutdown = _queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!wasShutdown) + { + _logger.warn("Failed to gracefully shutdown queue recovery executor within permitted time period"); + _queueRecoveryExecutor.shutdownNow(); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + private class DistributedTransactionVisitor implements DistributedTransactionHandler { @@ -335,7 +372,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer branch.setState(DtxBranch.State.PREPARED); branch.prePrepareTransaction(); - return true; + return _continueRecovery.get(); } private StringBuilder xidAsString(Xid id) @@ -364,7 +401,17 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer @Override public void run() { - recoverQueue(_queue); + String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Queue Recoverer : " + _queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); + + try + { + recoverQueue(_queue); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } } } @@ -408,7 +455,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer txn.dequeueMessage(_queue, new DummyMessage(messageId)); txn.commitTranAsync(); } - return true; + return _continueRecovery.get(); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java index 9f4c7dd319..7500d4e6e0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java @@ -23,4 +23,10 @@ package org.apache.qpid.server.virtualhost; public interface MessageStoreRecoverer { void recover(VirtualHostImpl virtualHost); + + /** + * Cancels any in-progress message store recovery. If message store recovery has already + * completed, this method call has no effect. + */ + void cancel(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index becf4a073c..1cdafde945 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -113,6 +113,12 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } + @Override + public void cancel() + { + // No-op + } + private static class MessageVisitor implements MessageHandler { |
